This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 5462e69c96 NIFI-12923 Added append avro mode to PutHDFS
5462e69c96 is described below
commit 5462e69c96671954453e1a658795ac0f08a95012
Author: Balázs Gerner <[email protected]>
AuthorDate: Wed Mar 20 09:18:25 2024 +0100
NIFI-12923 Added append avro mode to PutHDFS
NIFI-12923 remove var keyword
NIFI-12923 change property name
NIFI-12923 Added property dependency for append_mode
Signed-off-by: Matt Burgess <[email protected]>
NIFI-12923: Fixed issues during backport to 1.x
---
.../nifi-hdfs-processors/pom.xml | 4 +
.../org/apache/nifi/processors/hadoop/PutHDFS.java | 70 +++++++++++--
.../apache/nifi/processors/hadoop/PutHDFSTest.java | 114 ++++++++++++++++++++-
.../processors/hadoop/util/MockFileSystem.java | 58 ++++++++---
.../src/test/resources/testdata-avro/input.avro | Bin 0 -> 171 bytes
pom.xml | 5 +
6 files changed, 225 insertions(+), 26 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 771b7f0fe7..c794f960c2 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -41,6 +41,10 @@
<artifactId>nifi-hadoop-utils</artifactId>
<version>1.26.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 9a60b2fcc5..68aff386c3 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -18,6 +18,11 @@ package org.apache.nifi.processors.hadoop;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
@@ -45,6 +50,8 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -66,6 +73,7 @@ import java.io.UncheckedIOException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
@@ -114,6 +122,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
.build();
// properties
+ public static final String DEFAULT_APPEND_MODE = "DEFAULT";
+ public static final String AVRO_APPEND_MODE = "AVRO";
protected static final String REPLACE_RESOLUTION = "replace";
protected static final String IGNORE_RESOLUTION = "ignore";
@@ -154,6 +164,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
.allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
.build();
+ public static final PropertyDescriptor APPEND_MODE = new
PropertyDescriptor.Builder()
+ .name("Append Mode")
+ .description("Defines the append strategy to use when the Conflict
Resolution Strategy is set to 'append'.")
+ .allowableValues(DEFAULT_APPEND_MODE, AVRO_APPEND_MODE)
+ .defaultValue(DEFAULT_APPEND_MODE)
+ .dependsOn(CONFLICT_RESOLUTION, APPEND_RESOLUTION)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor BLOCK_SIZE = new
PropertyDescriptor.Builder()
.name("Block Size")
.description("Size of each block as written to HDFS. This
overrides the Hadoop Configuration")
@@ -231,6 +250,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
.description("The parent HDFS directory to which files should
be written. The directory will be created if it doesn't exist.")
.build());
props.add(CONFLICT_RESOLUTION);
+ props.add(APPEND_MODE);
props.add(WRITING_STRATEGY);
props.add(BLOCK_SIZE);
props.add(BUFFER_SIZE);
@@ -243,6 +263,22 @@ public class PutHDFS extends AbstractHadoopProcessor {
return props;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ final List<ValidationResult> problems = new
ArrayList<>(super.customValidate(validationContext));
+ final PropertyValue codec =
validationContext.getProperty(COMPRESSION_CODEC);
+ final boolean isCodecSet = codec.isSet() &&
!CompressionType.NONE.name().equals(codec.getValue());
+ if (isCodecSet &&
APPEND_RESOLUTION.equals(validationContext.getProperty(CONFLICT_RESOLUTION).getValue())
+ &&
AVRO_APPEND_MODE.equals(validationContext.getProperty(APPEND_MODE).getValue()))
{
+ problems.add(new ValidationResult.Builder()
+ .subject("Codec")
+ .valid(false)
+ .explanation("Compression codec cannot be set when used in
'append avro' mode")
+ .build());
+ }
+ return problems;
+ }
+
@Override
protected void preProcessConfiguration(final Configuration config, final
ProcessContext context) {
// Set umask once, to avoid thread safety issues doing it in onTrigger
@@ -384,14 +420,32 @@ public class PutHDFS extends AbstractHadoopProcessor {
null, null);
}
- if (codec != null) {
- fos = codec.createOutputStream(fos);
- }
- createdFile = actualCopyFile;
- BufferedInputStream bis = new
BufferedInputStream(in);
- StreamUtils.copy(bis, fos);
- bis = null;
- fos.flush();
+ if (codec != null) {
+ fos = codec.createOutputStream(fos);
+ }
+ createdFile = actualCopyFile;
+
+ final String appendMode =
context.getProperty(APPEND_MODE).getValue();
+ if (APPEND_RESOLUTION.equals(conflictResponse)
+ && AVRO_APPEND_MODE.equals(appendMode)
+ && destinationExists) {
+ getLogger().info("Appending avro record to
existing avro file");
+ try (final DataFileStream<Object> reader =
new DataFileStream<>(in, new GenericDatumReader<>());
+ final DataFileWriter<Object> writer =
new DataFileWriter<>(new GenericDatumWriter<>())) {
+ writer.appendTo(new FsInput(copyFile,
configuration), fos); // open writer to existing file
+ writer.appendAllFrom(reader, false);
// append flowfile content
+ writer.flush();
+ getLogger().info("Successfully
appended avro record");
+ } catch (Exception e) {
+ getLogger().error("Error occurred
during appending to existing avro file", e);
+ throw new ProcessException(e);
+ }
+ } else {
+ BufferedInputStream bis = new
BufferedInputStream(in);
+ StreamUtils.copy(bis, fos);
+ bis = null;
+ fos.flush();
+ }
} finally {
try {
if (fos != null) {
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 637f312b56..112e5e0356 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -48,22 +48,32 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import static org.apache.nifi.processors.hadoop.CompressionType.GZIP;
+import static org.apache.nifi.processors.hadoop.CompressionType.NONE;
+import static org.apache.nifi.processors.hadoop.PutHDFS.APPEND_RESOLUTION;
+import static org.apache.nifi.processors.hadoop.PutHDFS.AVRO_APPEND_MODE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class PutHDFSTest {
private final static String TARGET_DIRECTORY = "target/test-classes";
+ private final static String AVRO_TARGET_DIRECTORY = TARGET_DIRECTORY +
"/testdata-avro";
private final static String SOURCE_DIRECTORY =
"src/test/resources/testdata";
+ private final static String AVRO_SOURCE_DIRECTORY =
"src/test/resources/testdata-avro";
private final static String FILE_NAME = "randombytes-1";
+ private final static String AVRO_FILE_NAME = "input.avro";
private KerberosProperties kerberosProperties;
private MockFileSystem mockFileSystem;
@@ -186,6 +196,34 @@ public class PutHDFSTest {
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("is invalid because Given value
not found in allowed set"));
}
+
+ results = new HashSet<>();
+ runner.setProperty(PutHDFS.DIRECTORY, "target");
+ runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+ runner.setProperty(PutHDFS.COMPRESSION_CODEC, GZIP.name());
+ runner.enqueue(new byte[0]);
+ pc = runner.getProcessContext();
+ if (pc instanceof MockProcessContext) {
+ results = ((MockProcessContext) pc).validate();
+ }
+ assertEquals(1, results.size());
+ for (ValidationResult vr : results) {
+ assertEquals(vr.getSubject(), "Codec");
+ assertEquals(vr.getExplanation(), "Compression codec cannot be set
when used in 'append avro' mode");
+ }
+
+ results = new HashSet<>();
+ runner.setProperty(PutHDFS.DIRECTORY, "target");
+ runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+ runner.setProperty(PutHDFS.COMPRESSION_CODEC, NONE.name());
+ runner.enqueue(new byte[0]);
+ pc = runner.getProcessContext();
+ if (pc instanceof MockProcessContext) {
+ results = ((MockProcessContext) pc).validate();
+ }
+ assertEquals(0, results.size());
}
@Test
@@ -229,6 +267,58 @@ public class PutHDFSTest {
verify(spyFileSystem, times(1)).rename(any(Path.class),
any(Path.class));
}
+ @Test
+ public void testPutFileWithAppendAvroModeNewFileCreated() throws
IOException {
+ // given
+ final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+ final PutHDFS proc = new TestablePutHDFS(kerberosProperties,
spyFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+ runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+ final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" +
AVRO_FILE_NAME);
+
+ // when
+ try (final FileInputStream fis = new
FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
+ runner.enqueue(fis,
Collections.singletonMap(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
+ runner.assertValid();
+ runner.run();
+ }
+
+ // then
+ assertAvroAppendValues(runner, spyFileSystem, targetPath);
+ verify(spyFileSystem, times(0)).append(eq(targetPath), anyInt());
+ verify(spyFileSystem, times(1)).rename(any(Path.class),
eq(targetPath));
+ assertEquals(100, spyFileSystem.getFileStatus(targetPath).getLen());
+ }
+
+ @Test
+ public void testPutFileWithAppendAvroModeWhenTargetFileAlreadyExists()
throws IOException {
+ // given
+ final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+ final PutHDFS proc = new TestablePutHDFS(kerberosProperties,
spyFileSystem);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutHDFS.DIRECTORY, AVRO_TARGET_DIRECTORY);
+ runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, APPEND_RESOLUTION);
+ runner.setProperty(PutHDFS.APPEND_MODE, AVRO_APPEND_MODE);
+ spyFileSystem.setConf(new Configuration());
+ final Path targetPath = new Path(AVRO_TARGET_DIRECTORY + "/" +
AVRO_FILE_NAME);
+ spyFileSystem.createNewFile(targetPath);
+
+ // when
+ try (final FileInputStream fis = new
FileInputStream(AVRO_SOURCE_DIRECTORY + "/" + AVRO_FILE_NAME)) {
+ runner.enqueue(fis,
Collections.singletonMap(CoreAttributes.FILENAME.key(), AVRO_FILE_NAME));
+ runner.assertValid();
+ runner.run();
+ }
+
+ // then
+ assertAvroAppendValues(runner, spyFileSystem, targetPath);
+ verify(spyFileSystem).append(eq(targetPath), anyInt());
+ verify(spyFileSystem, times(0)).rename(any(Path.class),
eq(targetPath));
+ assertEquals(200, spyFileSystem.getFileStatus(targetPath).getLen());
+ }
+
@Test
public void testPutFileWithSimpleWrite() throws IOException {
// given
@@ -642,7 +732,29 @@ public class PutHDFSTest {
mockFileSystem.delete(p, true);
}
- private class TestablePutHDFS extends PutHDFS {
+ private static void assertAvroAppendValues(TestRunner runner, FileSystem
spyFileSystem, Path targetPath) throws IOException {
+ final List<MockFlowFile> failedFlowFiles =
runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+ assertTrue(failedFlowFiles.isEmpty());
+
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile flowFile = flowFiles.get(0);
+ assertTrue(spyFileSystem.exists(targetPath));
+ assertEquals(AVRO_FILE_NAME,
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals(AVRO_TARGET_DIRECTORY,
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+ assertEquals("true",
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+ final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+ // If it runs with a real HDFS, the protocol will be "hdfs://", but
with a local filesystem, just assert the filename.
+ assertTrue(sendEvent.getTransitUri().endsWith(AVRO_TARGET_DIRECTORY +
"/" + AVRO_FILE_NAME));
+
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(AVRO_TARGET_DIRECTORY
+ "/" + AVRO_FILE_NAME));
+ }
+
+ private static class TestablePutHDFS extends PutHDFS {
private final KerberosProperties testKerberosProperties;
private final FileSystem fileSystem;
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
index 3a3477502e..f020f8b3b5 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/util/MockFileSystem.java
@@ -41,9 +41,12 @@ import java.util.Map;
import java.util.Set;
public class MockFileSystem extends FileSystem {
+ private static final long DIR_LENGTH = 1L;
+ private static final long FILE_LENGTH = 100L;
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
+ private final Map<Path, FSDataOutputStream> pathToOutputStream = new
HashMap<>();
private boolean failOnOpen;
private boolean failOnClose;
@@ -51,7 +54,6 @@ public class MockFileSystem extends FileSystem {
private boolean failOnFileStatus;
private boolean failOnExists;
-
public void setFailOnClose(final boolean failOnClose) {
this.failOnClose = failOnClose;
}
@@ -102,22 +104,18 @@ public class MockFileSystem extends FileSystem {
throw new IOException(new AuthenticationException("test auth
error"));
}
pathToStatus.put(f, newFile(f, permission));
- if(failOnClose) {
- return new FSDataOutputStream(new ByteArrayOutputStream(), new
FileSystem.Statistics("")) {
- @Override
- public void close() throws IOException {
- super.close();
- throw new IOException("Fail on close");
- }
- };
- } else {
- return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics(""));
- }
+ final FSDataOutputStream outputStream = createOutputStream();
+ pathToOutputStream.put(f, outputStream);
+ return outputStream;
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize, final
Progressable progress) {
- return null;
+ pathToOutputStream.computeIfAbsent(f, f2 -> createOutputStream());
+ final FileStatus oldStatus = pathToStatus.get(f);
+ final long newLength = oldStatus.getLen() + FILE_LENGTH;
+ pathToStatus.put(f, updateLength(oldStatus, newLength));
+ return pathToOutputStream.get(f);
}
@Override
@@ -192,19 +190,45 @@ public class MockFileSystem extends FileSystem {
return pathToStatus.containsKey(f);
}
+ private FSDataOutputStream createOutputStream() {
+ if(failOnClose) {
+ return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics("")) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ throw new IOException("Fail on close");
+ }
+ };
+ } else {
+ return new FSDataOutputStream(new ByteArrayOutputStream(), new
Statistics(""));
+ }
+ }
+
+ private FileStatus updateLength(FileStatus oldStatus, Long newLength) {
+ try {
+ return new FileStatus(newLength, oldStatus.isDirectory(),
oldStatus.getReplication(),
+ oldStatus.getBlockSize(), oldStatus.getModificationTime(),
oldStatus.getAccessTime(),
+ oldStatus.getPermission(), oldStatus.getOwner(),
oldStatus.getGroup(),
+ (oldStatus.isSymlink() ? oldStatus.getSymlink() : null),
+ oldStatus.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public FileStatus newFile(Path p, FsPermission permission) {
- return new FileStatus(100L, false, 3, 128 * 1024 * 1024,
1523456000000L, 1523457000000L, permission, "owner", "group", p);
+ return new FileStatus(FILE_LENGTH, false, 3, 128 * 1024 * 1024,
1523456000000L, 1523457000000L, permission, "owner", "group", p);
}
public FileStatus newDir(Path p) {
- return new FileStatus(1L, true, 3, 128 * 1024 * 1024, 1523456000000L,
1523457000000L, perms((short) 0755), "owner", "group", (Path)null, p, true,
false, false);
+ return new FileStatus(DIR_LENGTH, true, 3, 128 * 1024 * 1024,
1523456000000L, 1523457000000L, perms((short) 0755), "owner", "group",
(Path)null, p, true, false, false);
}
public FileStatus newFile(String p) {
- return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L,
1523457000000L, perms((short)0644), "owner", "group", new Path(p));
+ return new FileStatus(FILE_LENGTH, false, 3, 128*1024*1024,
1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new
Path(p));
}
public FileStatus newDir(String p) {
- return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L,
1523457000000L, perms((short)0755), "owner", "group", new Path(p));
+ return new FileStatus(DIR_LENGTH, true, 3, 128*1024*1024,
1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new
Path(p));
}
@Override
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata-avro/input.avro
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata-avro/input.avro
new file mode 100644
index 0000000000..606b00a010
Binary files /dev/null and
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/testdata-avro/input.avro
differ
diff --git a/pom.xml b/pom.xml
index 9941881d7a..ec321421f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -252,6 +252,11 @@
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>