Repository: nifi
Updated Branches:
  refs/heads/master 8d8a9cba7 -> b32c70c41


NIFI-1929: Improvements for PutHDFS attribute handling

This closes #486.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b32c70c4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b32c70c4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b32c70c4

Branch: refs/heads/master
Commit: b32c70c419d794a84e2ff602c0b16d511ec3e8be
Parents: 8d8a9cb
Author: Matt Burgess <[email protected]>
Authored: Wed Jun 1 13:16:10 2016 -0400
Committer: Pierre Villard <[email protected]>
Committed: Mon Jun 6 11:58:55 2016 +0200

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/PutHDFS.java  | 15 +++++++-
 .../nifi/processors/hadoop/PutHDFSTest.java     | 38 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b32c70c4/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 7c97478..f05c2c7 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -24,7 +24,9 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -64,7 +66,11 @@ import java.util.concurrent.TimeUnit;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"})
 @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System 
(HDFS)")
-@WritesAttribute(attribute = "filename", description = "The name of the file 
written to HDFS comes from the value of this attribute.")
+@ReadsAttribute(attribute = "filename", description = "The name of the file 
written to HDFS comes from the value of this attribute.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The name of 
the file written to HDFS is stored in this attribute."),
+        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute.")
+})
 @SeeAlso(GetHDFS.class)
 public class PutHDFS extends AbstractHadoopProcessor {
 
@@ -75,6 +81,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
     public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
     public static final int BUFFER_SIZE_DEFAULT = 4096;
 
+    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = 
"absolute.hdfs.path";
+
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -329,8 +337,13 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     new Object[]{flowFile, copyFile, millis, dataRate});
 
             final String outputPath = copyFile.toString();
+            final String newFilename = copyFile.getName();
+            final String hdfsPath = copyFile.getParent().toString();
+            flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), newFilename);
+            flowFile = session.putAttribute(flowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
             final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" 
+ outputPath : "hdfs://" + outputPath;
             session.getProvenanceReporter().send(flowFile, transitUri);
+
             session.transfer(flowFile, REL_SUCCESS);
 
         } catch (final Throwable t) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/b32c70c4/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 76970ed..48e0f89 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -214,7 +214,45 @@ public class PutHDFSTest {
                 .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
         assertTrue(failedFlowFiles.isEmpty());
 
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
         assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
+        assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals("target/test-classes", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+    }
+
+    @Test
+    public void testPutFileWithCompression() throws IOException {
+        // Refer to comment in the BeforeClass method for an explanation
+        assumeTrue(isNotWindows());
+
+        PutHDFS proc = new TestablePutHDFS(kerberosProperties);
+        TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP");
+        runner.setValidateExpressionUsage(false);
+        try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1");) {
+            Map<String, String> attributes = new HashMap<String, String>();
+            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        Configuration config = new Configuration();
+        FileSystem fs = FileSystem.get(config);
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
+        assertTrue(failedFlowFiles.isEmpty());
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(fs.exists(new 
Path("target/test-classes/randombytes-1.gz")));
+        assertEquals("randombytes-1.gz", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals("target/test-classes", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
     }
 
     @Test

Reply via email to