Repository: nifi
Updated Branches:
  refs/heads/master 8615941c8 -> 26f80095b


NIFI-713: Infer Hadoop Compression Automatically

- Three main types of compression options:
        NONE            : no compression
        AUTOMATIC       : infers codec by extension
        SPECIFIED       : specified codec (e.g. snappy, gzip, bzip, or lz4)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c72fb201
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c72fb201
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c72fb201

Branch: refs/heads/master
Commit: c72fb201d52113a16b176be3e0337b4303124244
Parents: 2bb7853
Author: ricky <[email protected]>
Authored: Mon Aug 31 14:38:15 2015 -0400
Committer: ricky <[email protected]>
Committed: Mon Aug 31 14:38:15 2015 -0400

----------------------------------------------------------------------
 .../hadoop/AbstractHadoopProcessor.java         | 39 +++++++++++--
 .../hadoop/CreateHadoopSequenceFile.java        |  4 +-
 .../apache/nifi/processors/hadoop/GetHDFS.java  | 32 +++++++++--
 .../apache/nifi/processors/hadoop/PutHDFS.java  | 15 ++---
 .../nifi/processors/hadoop/GetHDFSTest.java     | 59 ++++++++++++++------
 5 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 8519d2c..0102b1f 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
@@ -59,6 +58,34 @@ import org.apache.nifi.util.Tuple;
  * This is a base class that is helpful when building processors interacting 
with HDFS.
  */
 public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+    /**
+     * Compression Type Enum
+     */
+    public enum CompressionType {
+        NONE,
+        DEFAULT,
+        BZIP,
+        GZIP,
+        LZ4,
+        SNAPPY,
+        AUTOMATIC;
+
+        @Override
+        public String toString() {
+            switch (this) {
+                case NONE: return "NONE";
+                case DEFAULT: return DefaultCodec.class.getName();
+                case BZIP: return BZip2Codec.class.getName();
+                case GZIP: return GzipCodec.class.getName();
+                case LZ4: return Lz4Codec.class.getName();
+                case SNAPPY: return SnappyCodec.class.getName();
+                case AUTOMATIC: return "Automatically Detected";
+            }
+            return null;
+        }
+    }
+
+
     private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() 
{
         @Override
         public ValidationResult validate(String subject, String input, 
ValidationContext context) {
@@ -94,8 +121,8 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
 
     public static final String DIRECTORY_PROP_NAME = "Directory";
 
-    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder().name("Compression codec").required(false)
-            .allowableValues(BZip2Codec.class.getName(), 
DefaultCodec.class.getName(), GzipCodec.class.getName(), 
Lz4Codec.class.getName(), SnappyCodec.class.getName()).build();
+    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder().name("Compression codec").required(true)
+            
.allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build();
 
     public static final PropertyDescriptor KERBEROS_PRINCIPAL = new 
PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
             .description("Kerberos principal to authenticate as. Requires 
nifi.kerberos.krb5.file to be set " + "in your 
nifi.properties").addValidator(Validator.VALID)
@@ -324,10 +351,10 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
      *            the Hadoop Configuration
      * @return CompressionCodec or null
      */
-    protected CompressionCodec getCompressionCodec(ProcessContext context, 
Configuration configuration) {
-        CompressionCodec codec = null;
+    protected org.apache.hadoop.io.compress.CompressionCodec 
getCompressionCodec(ProcessContext context, Configuration configuration) {
+        org.apache.hadoop.io.compress.CompressionCodec codec = null;
         if (context.getProperty(COMPRESSION_CODEC).isSet()) {
-            String compressionClassname = 
context.getProperty(COMPRESSION_CODEC).getValue();
+            String compressionClassname = 
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
             CompressionCodecFactory ccf = new 
CompressionCodecFactory(configuration);
             codec = ccf.getCodecByClassName(compressionClassname);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
index 186a290..1f55164 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -152,7 +153,8 @@ public class CreateHadoopSequenceFile extends 
AbstractHadoopProcessor {
                 sequenceFileWriter = new SequenceFileWriterImpl();
         }
         String value = context.getProperty(COMPRESSION_TYPE).getValue();
-        CompressionType compressionType = value == null ? 
CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : 
CompressionType.valueOf(value);
+        SequenceFile.CompressionType compressionType = value == null ?
+          SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : 
SequenceFile.CompressionType.valueOf(value);
         final String fileName = 
flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
         flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), fileName);
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
index cac61b0..de776d4 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java
@@ -33,18 +33,20 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -332,6 +334,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
     protected void processBatchOfFiles(final List<Path> files, final 
ProcessContext context, final ProcessSession session) {
         // process the batch of files
         InputStream stream = null;
+        CompressionCodec codec = null;
         Configuration conf = getConfiguration();
         FileSystem hdfs = getFileSystem();
         final boolean keepSourceFiles = 
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@@ -339,19 +342,36 @@ public class GetHDFS extends AbstractHadoopProcessor {
         int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : 
conf.getInt(BUFFER_SIZE_KEY,
                 BUFFER_SIZE_DEFAULT);
         final Path rootDir = new 
Path(context.getProperty(DIRECTORY).getValue());
-        final CompressionCodec codec = getCompressionCodec(context, conf);
+
+        final CompressionType compressionType = 
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
+        final boolean inferCompressionCodec = compressionType == 
CompressionType.AUTOMATIC;
+        if (inferCompressionCodec || compressionType != CompressionType.NONE) {
+            codec = getCompressionCodec(context, getConfiguration());
+        }
+        final CompressionCodecFactory compressionCodecFactory = new 
CompressionCodecFactory(conf);
         for (final Path file : files) {
             try {
                 if (!hdfs.exists(file)) {
                     continue; // if file is no longer there then move on
                 }
-                final String filename = file.getName();
+                final String originalFilename = file.getName();
                 final String relativePath = getPathDifference(rootDir, file);
 
                 stream = hdfs.open(file, bufferSize);
+
+                final String outputFilename;
+                // Check if we should infer compression codec
+                if (inferCompressionCodec) {
+                    codec = compressionCodecFactory.getCodec(file);
+                }
+                // Check if compression codec is defined (inferred or 
otherwise)
                 if (codec != null) {
                     stream = codec.createInputStream(stream);
+                    outputFilename = StringUtils.removeEnd(originalFilename, 
codec.getDefaultExtension());
+                } else {
+                    outputFilename = originalFilename;
                 }
+
                 FlowFile flowFile = session.create();
 
                 final StopWatch stopWatch = new StopWatch(true);
@@ -361,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
                 final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
 
                 flowFile = session.putAttribute(flowFile, 
CoreAttributes.PATH.key(), relativePath);
-                flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), filename);
+                flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), outputFilename);
 
                 if (!keepSourceFiles && !hdfs.delete(file, false)) {
                     getLogger().warn("Could not remove {} from HDFS. Not 
ingesting this file ...",
@@ -370,7 +390,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
                     continue;
                 }
 
-                final String transitUri = (filename.startsWith("/")) ? 
"hdfs:/" + filename : "hdfs://" + filename;
+                final String transitUri = (originalFilename.startsWith("/")) ? 
"hdfs:/" + originalFilename : "hdfs://" + originalFilename;
                 session.getProvenanceReporter().receive(flowFile, transitUri);
                 session.transfer(flowFile, REL_SUCCESS);
                 getLogger().info("retrieved {} from HDFS {} in {} milliseconds 
at a rate of {}",

http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 419b1de..901159b 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -219,13 +219,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
 
         final CompressionCodec codec = getCompressionCodec(context, 
configuration);
 
+        final String filename = codec != null
+                ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + 
codec.getDefaultExtension()
+                : flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
         Path tempDotCopyFile = null;
         try {
-            final Path tempCopyFile;
-            final Path copyFile;
-
-            tempCopyFile = new Path(configuredRootDirPath, "." + 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
-            copyFile = new Path(configuredRootDirPath, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+            final Path tempCopyFile = new Path(configuredRootDirPath, "." + 
filename);
+            final Path copyFile = new Path(configuredRootDirPath, filename);
 
             // Create destination directory if it does not exist
             try {
@@ -327,8 +328,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
             getLogger().info("copied {} to HDFS at {} in {} milliseconds at a 
rate of {}",
                     new Object[]{flowFile, copyFile, millis, dataRate});
 
-            final String filename = copyFile.toString();
-            final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + 
filename : "hdfs://" + filename;
+            final String outputPath = copyFile.toString();
+            final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" 
+ outputPath : "hdfs://" + outputPath;
             session.getProvenanceReporter().send(flowFile, transitUri);
             session.transfer(flowFile, REL_SUCCESS);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c72fb201/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
index b0dd17f..e8714dd 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java
@@ -33,8 +33,6 @@ import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -108,19 +106,6 @@ public class GetHDFSTest {
         for (ValidationResult vr : results) {
             Assert.assertTrue(vr.toString().contains("is invalid because 
Minimum File Age cannot be greater than Maximum File Age"));
         }
-
-        results = new HashSet<>();
-        runner.setProperty(GetHDFS.DIRECTORY, "/target");
-        runner.setProperty(GetHDFS.COMPRESSION_CODEC, 
CompressionCodec.class.getName());
-        runner.enqueue(new byte[0]);
-        pc = runner.getProcessContext();
-        if (pc instanceof MockProcessContext) {
-            results = ((MockProcessContext) pc).validate();
-        }
-        Assert.assertEquals(1, results.size());
-        for (ValidationResult vr : results) {
-            Assert.assertTrue(vr.toString().contains("is invalid because Given 
value not found in allowed set"));
-        }
     }
 
     @Test
@@ -138,18 +123,56 @@ public class GetHDFSTest {
     }
 
     @Test
-    public void testGetFilesWithCompression() throws IOException {
+    public void testAutomaticDecompression() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
         runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
         runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
-        runner.setProperty(GetHDFS.COMPRESSION_CODEC, 
GzipCodec.class.getName());
         runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
         runner.run();
+
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
+
         MockFlowFile flowFile = flowFiles.get(0);
-        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz"));
+        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
         InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1");
         flowFile.assertContentEquals(expected);
     }
+
+    @Test
+    public void testInferCompressionCodecDisabled() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
+        runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
+        runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE");
+        runner.run();
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        MockFlowFile flowFile = flowFiles.get(0);
+        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
+        InputStream expected = 
getClass().getResourceAsStream("/testdata/randombytes-1.gz");
+        flowFile.assertContentEquals(expected);
+    }
+
+    @Test
+    public void testFileExtensionNotACompressionCodec() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
+        runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
+        runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
+        runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
+        runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
+        runner.run();
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        MockFlowFile flowFile = flowFiles.get(0);
+        
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
+        InputStream expected = 
getClass().getResourceAsStream("/testdata/13545423550275052.zip");
+        flowFile.assertContentEquals(expected);
+    }
 }

Reply via email to