This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 5462e69c96 NIFI-12923 Added append avro mode to PutHDFS
5462e69c96 is described below

commit 5462e69c96671954453e1a658795ac0f08a95012
Author: Balázs Gerner <[email protected]>
AuthorDate: Wed Mar 20 09:18:25 2024 +0100

    NIFI-12923 Added append avro mode to PutHDFS
    
    NIFI-12923 remove var keyword
    
    NIFI-12923 change property name
    
    NIFI-12923 Added property dependency for append_mode
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    NIFI-12923: Fixed issues during backport to 1.x
---
 .../nifi-hdfs-processors/pom.xml                   |   4 +
 .../org/apache/nifi/processors/hadoop/PutHDFS.java |  70 +++++++++++--
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 114 ++++++++++++++++++++-
 .../processors/hadoop/util/MockFileSystem.java     |  58 ++++++++---
 .../src/test/resources/testdata-avro/input.avro    | Bin 0 -> 171 bytes
 pom.xml                                            |   5 +
 6 files changed, 225 insertions(+), 26 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 771b7f0fe7..c794f960c2 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -41,6 +41,10 @@
             <artifactId>nifi-hadoop-utils</artifactId>
             <version>1.26.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-mapred</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-flowfile-packager</artifactId>
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 9a60b2fcc5..68aff386c3 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -18,6 +18,11 @@ package org.apache.nifi.processors.hadoop;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileStatus;
@@ -45,6 +50,8 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -66,6 +73,7 @@ import java.io.UncheckedIOException;
 import java.security.PrivilegedAction;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -114,6 +122,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .build();
 
     // properties
+    public static final String DEFAULT_APPEND_MODE = "DEFAULT";
+    public static final String AVRO_APPEND_MODE = "AVRO";
 
     protected static final String REPLACE_RESOLUTION = "replace";
     protected static final String IGNORE_RESOLUTION = "ignore";
@@ -154,6 +164,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
             .build();
 
+    public static final PropertyDescriptor APPEND_MODE = new 
PropertyDescriptor.Builder()
+            .name("Append Mode")
+            .description("Defines the append strategy to use when the Conflict 
Resolution Strategy is set to 'append'.")
+            .allowableValues(DEFAULT_APPEND_MODE, AVRO_APPEND_MODE)
+            .defaultValue(DEFAULT_APPEND_MODE)
+            .dependsOn(CONFLICT_RESOLUTION, APPEND_RESOLUTION)
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor BLOCK_SIZE = new 
PropertyDescriptor.Builder()
             .name("Block Size")
             .description("Size of each block as written to HDFS. This 
overrides the Hadoop Configuration")
@@ -231,6 +250,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 .description("The parent HDFS directory to which files should 
be written. The directory will be created if it doesn't exist.")
                 .build());
         props.add(CONFLICT_RESOLUTION);
+        props.add(APPEND_MODE);
         props.add(WRITING_STRATEGY);
         props.add(BLOCK_SIZE);
         props.add(BUFFER_SIZE);
@@ -243,6 +263,22 @@ public class PutHDFS extends AbstractHadoopProcessor {
         return props;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+        final PropertyValue codec = 
validationContext.getProperty(COMPRESSION_CODEC);
+        final boolean isCodecSet = codec.isSet() && 
!CompressionType.NONE.name().equals(codec.getValue());
+        if (isCodecSet && 
APPEND_RESOLUTION.equals(validationContext.getProperty(CONFLICT_RESOLUTION).getValue())
+                && 
AVRO_APPEND_MODE.equals(validationContext.getProperty(APPEND_MODE).getValue())) 
{
+            problems.add(new ValidationResult.Builder()
+                    .subject("Codec")
+                    .valid(false)
+                    .explanation("Compression codec cannot be set when used in 
'append avro' mode")
+                    .build());
+        }
+        return problems;
+    }
+
     @Override
     protected void preProcessConfiguration(final Configuration config, final 
ProcessContext context) {
         // Set umask once, to avoid thread safety issues doing it in onTrigger
@@ -384,14 +420,32 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                         null, null);
                             }
 
-                            if (codec != null) {
-                                fos = codec.createOutputStream(fos);
-                            }
-                            createdFile = actualCopyFile;
-                            BufferedInputStream bis = new 
BufferedInputStream(in);
-                            StreamUtils.copy(bis, fos);
-                            bis = null;
-                            fos.flush();
+                                if (codec != null) {
+                                    fos = codec.createOutputStream(fos);
+                                }
+                                createdFile = actualCopyFile;
+
+                                final String appendMode = 
context.getProperty(APPEND_MODE).getValue();
+                                if (APPEND_RESOLUTION.equals(conflictResponse)
+                                        && AVRO_APPEND_MODE.equals(appendMode)
+                                        && destinationExists) {
+                                    getLogger().info("Appending avro record to 
existing avro file");
+                                    try (final DataFileStream<Object> reader = 
new DataFileStream<>(in, new GenericDatumReader<>());
+                                         final DataFileWriter<Object> writer = 
new DataFileWriter<>(new GenericDatumWriter<>())) {
+                                        writer.appendTo(new FsInput(copyFile, 
configuration), fos); // open writer to existing file
+                                        writer.appendAllFrom(reader, false); 
// append flowfile content
+                                        writer.flush();
+                                        getLogger().info("Successfully 
appended avro record");
+                                    } catch (Exception e) {
+                                        getLogger().error("Error occurred 
during appending to existing avro file", e);
+                                        throw new ProcessException(e);
+                                    }
+                                } else {
+                                    BufferedInputStream bis = new 
BufferedInputStream(in);
+                                    StreamUtils.copy(bis, fos);
+                                    bis = null;
+                                    fos.flush();
+                                }
                         } finally {
                             try {
                                 if (fos != null) {
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 637f312b56..112e5e0356 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -48,22 +48,32 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
+import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
+import static org.apache.nifi.processors.hadoop.PutHDFS.APPEND_RESOLUTION;
+import static org.apache.nifi.processors.hadoop.PutHDFS.AVRO_APPEND_MODE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class PutHDFSTest {
     private final static String TARGET_DIRECTORY = "target/test-classes";
+    private final static String AVRO_TARGET_DIRECTORY = TARGET_DIRECTORY + 
"/testdata-avro";
     private final static String SOURCE_DIRECTORY = 
"src/test/resources/testdata";
+    private final static String AVRO_SOURCE_DIRECTORY = 
"src/test/resources/testdata-avro";
     private final static String FILE_NAME = "randombytes-1";
+    private final static String AVRO_FILE_NAME = "input.avro";
 
     private KerberosProperties kerberosProperties;
     private MockFileSystem mockFileSystem;
@@ -186,6 +196,34 @@ public class PutHDFSTest {
         for (ValidationResult vr : results) {
             assertTrue(vr.toString().contains("is invalid because Given value 
not found in allowed set"));
         }
+
+        results = new HashSet<>();
+        runner.setProperty(PutHDFS.DIRECTORY, "target");
+        runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+        runner.setProperty(PutHDFS.COMPRESSION_CODEC, GZIP.name());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        assertEquals(1, results.size());
+        for (ValidationResult vr : results) {
+            assertEquals(vr.getSubject(), "Codec");
+            assertEquals(vr.getExplanation(), "Compression codec cannot be set 
when used in 'append avro' mode");
+        }
+
+        results = new HashSet<>();
+        runner.setProperty(PutHDFS.DIRECTORY, "target");
+        runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+        runner.setProperty(PutHDFS.COMPRESSION_CODEC, NONE.name());
+        runner.enqueue(new byte[0]);
+        pc = runner.getProcessContext();
+        if (pc instanceof MockProcessContext) {
+            results = ((MockProcessContext) pc).validate();
+        }
+        assertEquals(0, results.size());
     }
 
     @Test
@@ -229,6 +267,58 @@ public class PutHDFSTest {
         verify(spyFileSystem, times(1)).rename(any(Path.class), 
any(Path.class));
     }
 
+    @Test
+    public void testPutFileWithAppendAvroModeNewFileCreated() throws 
IOException {
+        // given
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, 
spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+        runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+        final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" + 
AVRO_FILE_NAME);
+
+        // when
+        try (final FileInputStream fis = new 
FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
+            runner.enqueue(fis, 
Collections.singletonMap(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
+            runner.assertValid();
+            runner.run();
+        }
+
+        // then
+        assertAvroAppendValues(runner, spyFileSystem, targetPath);
+        verify(spyFileSystem, times(0)).append(eq(targetPath), anyInt());
+        verify(spyFileSystem, times(1)).rename(any(Path.class), 
eq(targetPath));
+        assertEquals(100, spyFileSystem.getFileStatus(targetPath).getLen());
+    }
+
+    @Test
+    public void testPutFileWithAppendAvroModeWhenTargetFileAlreadyExists() 
throws IOException {
+        // given
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, 
spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+        runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+        spyFileSystem.setConf(new Configuration());
+        final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" + 
AVRO_FILE_NAME);
+        spyFileSystem.createNewFile(targetPath);
+
+        // when
+        try (final FileInputStream fis = new 
FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
+            runner.enqueue(fis, 
Collections.singletonMap(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
+            runner.assertValid();
+            runner.run();
+        }
+
+        // then
+        assertAvroAppendValues(runner, spyFileSystem, targetPath);
+        verify(spyFileSystem).append(eq(targetPath), anyInt());
+        verify(spyFileSystem, times(0)).rename(any(Path.class), 
eq(targetPath));
+        assertEquals(200, spyFileSystem.getFileStatus(targetPath).getLen());
+    }
+
     @Test
     public void testPutFileWithSimpleWrite() throws IOException {
         // given
@@ -642,7 +732,29 @@ public class PutHDFSTest {
         mockFileSystem.delete(p, true);
     }
 
-    private class TestablePutHDFS extends PutHDFS {
+    private static void assertAvroAppendValues(TestRunner runner, FileSystem 
spyFileSystem, Path targetPath) throws IOException {
+        final List<MockFlowFile> failedFlowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+        assertTrue(failedFlowFiles.isEmpty());
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(spyFileSystem.exists(targetPath));
+        assertEquals(AVRO_FILE_NAME, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(AVRO_TARGET_DIRECTORY, 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        assertEquals("true", 
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+        final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
+        assertTrue(sendEvent.getTransitUri().endsWith(AVRO_TARGET_DIRECTORY + 
"/" + AVRO_FILE_NAME));
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(AVRO_TARGET_DIRECTORY
 + "/" + AVRO_FILE_NAME));
+    }
+
+    private static class TestablePutHDFS extends PutHDFS {
 
         private final KerberosProperties testKerberosProperties;
         private final FileSystem fileSystem;
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
index 3a3477502e..f020f8b3b5 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
@@ -41,9 +41,12 @@ import java.util.Map;
 import java.util.Set;
 
 public class MockFileSystem extends FileSystem {
+    private static final long DIR_LENGTH = 1L;
+    private static final long FILE_LENGTH = 100L;
     private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
     private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
     private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+    private final Map<Path, FSDataOutputStream> pathToOutputStream = new 
HashMap<>();
 
     private boolean failOnOpen;
     private boolean failOnClose;
@@ -51,7 +54,6 @@ public class MockFileSystem extends FileSystem {
     private boolean failOnFileStatus;
     private boolean failOnExists;
 
-
     public void setFailOnClose(final boolean failOnClose) {
         this.failOnClose = failOnClose;
     }
@@ -102,22 +104,18 @@ public class MockFileSystem extends FileSystem {
             throw new IOException(new AuthenticationException("test auth 
error"));
         }
         pathToStatus.put(f, newFile(f, permission));
-        if(failOnClose) {
-            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
FileSystem.Statistics("")) {
-                @Override
-                public void close() throws IOException {
-                    super.close();
-                    throw new IOException("Fail on close");
-                }
-            };
-        } else {
-            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics(""));
-        }
+        final FSDataOutputStream outputStream = createOutputStream();
+        pathToOutputStream.put(f, outputStream);
+        return outputStream;
     }
 
     @Override
     public FSDataOutputStream append(final Path f, final int bufferSize, final 
Progressable progress) {
-        return null;
+        pathToOutputStream.computeIfAbsent(f, f2 -> createOutputStream());
+        final FileStatus oldStatus = pathToStatus.get(f);
+        final long newLength = oldStatus.getLen() + FILE_LENGTH;
+        pathToStatus.put(f, updateLength(oldStatus, newLength));
+        return pathToOutputStream.get(f);
     }
 
     @Override
@@ -192,19 +190,45 @@ public class MockFileSystem extends FileSystem {
         return pathToStatus.containsKey(f);
     }
 
+    private FSDataOutputStream createOutputStream() {
+        if(failOnClose) {
+            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics("")) {
+                @Override
+                public void close() throws IOException {
+                    super.close();
+                    throw new IOException("Fail on close");
+                }
+            };
+        } else {
+            return new FSDataOutputStream(new ByteArrayOutputStream(), new 
Statistics(""));
+        }
+    }
+
+    private FileStatus updateLength(FileStatus oldStatus, Long newLength) {
+        try {
+            return new FileStatus(newLength, oldStatus.isDirectory(), 
oldStatus.getReplication(),
+                    oldStatus.getBlockSize(), oldStatus.getModificationTime(), 
oldStatus.getAccessTime(),
+                    oldStatus.getPermission(), oldStatus.getOwner(), 
oldStatus.getGroup(),
+                    (oldStatus.isSymlink() ? oldStatus.getSymlink() : null),
+                    oldStatus.getPath());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public FileStatus newFile(Path p, FsPermission permission) {
-        return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, permission, "owner", "group", p);
+        return new FileStatus(FILE_LENGTH, false, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, permission, "owner", "group", p);
     }
 
     public FileStatus newDir(Path p) {
-        return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L, 
1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true, 
false, false);
+        return new FileStatus(DIR_LENGTH, true, 3, 128 * 1024 * 1024, 
1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group", 
(Path)null, p, true, false, false);
     }
 
     public FileStatus newFile(String p) {
-        return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 
1523457000000L, perms((short)0644), "owner", "group", new Path(p));
+        return new FileStatus(FILE_LENGTH, false, 3, 128*1024*1024, 
1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new 
Path(p));
     }
     public FileStatus newDir(String p) {
-        return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 
1523457000000L, perms((short)0755), "owner", "group", new Path(p));
+        return new FileStatus(DIR_LENGTH, true, 3, 128*1024*1024, 
1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new 
Path(p));
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata-avro/input.avro
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata-avro/input.avro
new file mode 100644
index 0000000000..606b00a010
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata-avro/input.avro
 differ
diff --git a/pom.xml b/pom.xml
index 9941881d7a..ec321421f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -252,6 +252,11 @@
                 <artifactId>avro-ipc</artifactId>
                 <version>${avro.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro-mapred</artifactId>
+                <version>${avro.version}</version>
+            </dependency>
             <dependency>
                 <groupId>commons-cli</groupId>
                 <artifactId>commons-cli</artifactId>

Reply via email to