Repository: nifi
Updated Branches:
refs/heads/master 8615941c8 -> 26f80095b
NIFI-713: Infer Hadoop Compression Automatically
- Three main types of compression options:
NONE : no compression
AUTOMATIC : infers codec by extension
SPECIFIED : specified codec (e.g. snappy, gzip, bzip, or lz4)
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c72fb201
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c72fb201
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c72fb201
Branch: refs/heads/master
Commit: c72fb201d52113a16b176be3e0337b4303124244
Parents: 2bb7853
Author: ricky <[email protected]>
Authored: Mon Aug 31 14:38:15 2015 -0400
Committer: ricky <[email protected]>
Committed: Mon Aug 31 14:38:15 2015 -0400
----------------------------------------------------------------------
.../hadoop/AbstractHadoopProcessor.java | 39 +++++++++++--
.../hadoop/CreateHadoopSequenceFile.java | 4 +-
.../apache/nifi/processors/hadoop/GetHDFS.java | 32 +++++++++--
.../apache/nifi/processors/hadoop/PutHDFS.java | 15 ++---
.../nifi/processors/hadoop/GetHDFSTest.java | 59 ++++++++++++++------
5 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 8519d2c..0102b1f 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
@@ -59,6 +58,34 @@ import org.apache.nifi.util.Tuple;
* This is a base class that is helpful when building processors interacting
with HDFS.
*/
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+ /**
+ * Compression Type Enum
+ */
+ public enum CompressionType {
+ NONE,
+ DEFAULT,
+ BZIP,
+ GZIP,
+ LZ4,
+ SNAPPY,
+ AUTOMATIC;
+
+ @Override
+ public String toString() {
+ switch (this) {
+ case NONE: return "NONE";
+ case DEFAULT: return DefaultCodec.class.getName();
+ case BZIP: return BZip2Codec.class.getName();
+ case GZIP: return GzipCodec.class.getName();
+ case LZ4: return Lz4Codec.class.getName();
+ case SNAPPY: return SnappyCodec.class.getName();
+ case AUTOMATIC: return "Automatically Detected";
+ }
+ return null;
+ }
+ }
+
+
private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator()
{
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
@@ -94,8 +121,8 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
public static final String DIRECTORY_PROP_NAME = "Directory";
- public static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder().name("Compression codec").required(false)
- .allowableValues(BZip2Codec.class.getName(),
DefaultCodec.class.getName(), GzipCodec.class.getName(),
Lz4Codec.class.getName(), SnappyCodec.class.getName()).build();
+ public static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder().name("Compression codec").required(true)
+
.allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build();
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new
PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
.description("Kerberos principal to authenticate as. Requires
nifi.kerberos.krb5.file to be set " + "in your
nifi.properties").addValidator(Validator.VALID)
@@ -324,10 +351,10 @@ public abstract class AbstractHadoopProcessor extends
AbstractProcessor {
* the Hadoop Configuration
* @return CompressionCodec or null
*/
- protected CompressionCodec getCompressionCodec(ProcessContext context,
Configuration configuration) {
- CompressionCodec codec = null;
+ protected org.apache.hadoop.io.compress.CompressionCodec
getCompressionCodec(ProcessContext context, Configuration configuration) {
+ org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
- String compressionClassname =
context.getProperty(COMPRESSION_CODEC).getValue();
+ String compressionClassname =
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new
CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index 186a290..1f55164 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -152,7 +153,8 @@ public class CreateHadoopSequenceFile extends
AbstractHadoopProcessor {
sequenceFileWriter = new SequenceFileWriterImpl();
}
String value = context.getProperty(COMPRESSION_TYPE).getValue();
- CompressionType compressionType = value == null ?
CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) :
CompressionType.valueOf(value);
+ SequenceFile.CompressionType compressionType = value == null ?
+ SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) :
SequenceFile.CompressionType.valueOf(value);
final String fileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), fileName);
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index cac61b0..de776d4 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -33,18 +33,20 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -332,6 +334,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final
ProcessContext context, final ProcessSession session) {
// process the batch of files
InputStream stream = null;
+ CompressionCodec codec = null;
Configuration conf = getConfiguration();
FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles =
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@@ -339,19 +342,36 @@ public class GetHDFS extends AbstractHadoopProcessor {
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() :
conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
final Path rootDir = new
Path(context.getProperty(DIRECTORY).getValue());
- final CompressionCodec codec = getCompressionCodec(context, conf);
+
+ final CompressionType compressionType =
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+ final boolean inferCompressionCodec = compressionType ==
CompressionType.AUTOMATIC;
+ if (inferCompressionCodec || compressionType != CompressionType.NONE) {
+ codec = getCompressionCodec(context, getConfiguration());
+ }
+ final CompressionCodecFactory compressionCodecFactory = new
CompressionCodecFactory(conf);
for (final Path file : files) {
try {
if (!hdfs.exists(file)) {
continue; // if file is no longer there then move on
}
- final String filename = file.getName();
+ final String originalFilename = file.getName();
final String relativePath = getPathDifference(rootDir, file);
stream = hdfs.open(file, bufferSize);
+
+ final String outputFilename;
+ // Check if we should infer compression codec
+ if (inferCompressionCodec) {
+ codec = compressionCodecFactory.getCodec(file);
+ }
+ // Check if compression codec is defined (inferred or
otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
+ outputFilename = StringUtils.removeEnd(originalFilename,
codec.getDefaultExtension());
+ } else {
+ outputFilename = originalFilename;
}
+
FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(true);
@@ -361,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final long millis =
stopWatch.getDuration(TimeUnit.MILLISECONDS);
flowFile = session.putAttribute(flowFile,
CoreAttributes.PATH.key(), relativePath);
- flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), filename);
+ flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), outputFilename);
if (!keepSourceFiles && !hdfs.delete(file, false)) {
getLogger().warn("Could not remove {} from HDFS. Not
ingesting this file ...",
@@ -370,7 +390,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue;
}
- final String transitUri = (filename.startsWith("/")) ?
"hdfs:/" + filename : "hdfs://" + filename;
+ final String transitUri = (originalFilename.startsWith("/")) ?
"hdfs:/" + originalFilename : "hdfs://" + originalFilename;
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds
at a rate of {}",
http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 419b1de..901159b 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -219,13 +219,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
final CompressionCodec codec = getCompressionCodec(context,
configuration);
+ final String filename = codec != null
+ ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) +
codec.getDefaultExtension()
+ : flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
Path tempDotCopyFile = null;
try {
- final Path tempCopyFile;
- final Path copyFile;
-
- tempCopyFile = new Path(configuredRootDirPath, "." +
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
- copyFile = new Path(configuredRootDirPath,
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ final Path tempCopyFile = new Path(configuredRootDirPath, "." +
filename);
+ final Path copyFile = new Path(configuredRootDirPath, filename);
// Create destination directory if it does not exist
try {
@@ -327,8 +328,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a
rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate});
- final String filename = copyFile.toString();
- final String transitUri = (filename.startsWith("/")) ? "hdfs:/" +
filename : "hdfs://" + filename;
+ final String outputPath = copyFile.toString();
+ final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/"
+ outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index b0dd17f..e8714dd 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -33,8 +33,6 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Assert;
import org.junit.Test;
@@ -108,19 +106,6 @@ public class GetHDFSTest {
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because
Minimum File Age cannot be greater than Maximum File Age"));
}
-
- results = new HashSet<>();
- runner.setProperty(GetHDFS.DIRECTORY, "/target");
- runner.setProperty(GetHDFS.COMPRESSION_CODEC,
CompressionCodec.class.getName());
- runner.enqueue(new byte[0]);
- pc = runner.getProcessContext();
- if (pc instanceof MockProcessContext) {
- results = ((MockProcessContext) pc).validate();
- }
- Assert.assertEquals(1, results.size());
- for (ValidationResult vr : results) {
- Assert.assertTrue(vr.toString().contains("is invalid because Given
value not found in allowed set"));
- }
}
@Test
@@ -138,18 +123,56 @@ public class GetHDFSTest {
}
@Test
- public void testGetFilesWithCompression() throws IOException {
+ public void testAutomaticDecompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
- runner.setProperty(GetHDFS.COMPRESSION_CODEC,
GzipCodec.class.getName());
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+ runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
+
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
+
MockFlowFile flowFile = flowFiles.get(0);
-
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz"));
+
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
InputStream expected =
getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
+
+ @Test
+ public void testInferCompressionCodecDisabled() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
+ runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
+ runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
+ runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+ runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE");
+ runner.run();
+
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ MockFlowFile flowFile = flowFiles.get(0);
+
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+ InputStream expected =
getClass().getResourceAsStream("/testdata/randombytes-1.gz");
+ flowFile.assertContentEquals(expected);
+ }
+
+ @Test
+ public void testFileExtensionNotACompressionCodec() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
+ runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
+ runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
+ runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+ runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+ runner.run();
+
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ MockFlowFile flowFile = flowFiles.get(0);
+
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+ InputStream expected =
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
+ flowFile.assertContentEquals(expected);
+ }
}