Repository: nifi
Updated Branches:
  refs/heads/master ee18ead16 -> 09f926003


NIFI-5293 - add merge.uuid to MergeRecord processor

This closes #2783

Signed-off-by: zenfenan <zenfe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/09f92600
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/09f92600
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/09f92600

Branch: refs/heads/master
Commit: 09f926003521ce33eb441edb2fe6c5942059e98b
Parents: ee18ead
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Mon Jun 11 14:14:48 2018 +0200
Committer: zenfenan <sivaprasanna...@gmail.com>
Committed: Tue Jun 12 19:16:17 2018 +0530

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/MergeRecord.java  |  2 ++
 .../apache/nifi/processors/standard/merge/RecordBin.java  |  1 +
 .../apache/nifi/processors/standard/TestMergeRecord.java  | 10 +++++++---
 3 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/09f92600/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 3b27529..56227f2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -89,6 +89,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
     @WritesAttribute(attribute = "merge.count", description = "The number of 
FlowFiles that were merged into this bundle"),
     @WritesAttribute(attribute = "merge.bin.age", description = "The age of 
the bin, in milliseconds, when it was merged and output. Effectively "
         + "this is the greatest amount of time that any FlowFile in this 
bundle remained waiting in this processor before it was output"),
+    @WritesAttribute(attribute = "merge.uuid", description = "UUID of the 
merged FlowFile that will be added to the original FlowFiles attributes"),
     @WritesAttribute(attribute = "<Attributes from Record Writer>", 
description = "Any Attribute that the configured Record Writer returns will be 
added to the FlowFile.")
 })
 @SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class})
@@ -100,6 +101,7 @@ public class MergeRecord extends 
AbstractSessionFactoryProcessor {
 
     public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
     public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+    public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid";
 
     public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new 
AllowableValue(
         "Bin-Packing Algorithm",

http://git-wip-us.apache.org/repos/asf/nifi/blob/09f92600/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index ad60f6a..6dc4247 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -393,6 +393,7 @@ public class RecordBin {
             attributes.put(MERGE_BIN_AGE_ATTRIBUTE, 
Long.toString(getBinAge()));
 
             merged = session.putAllAttributes(merged, attributes);
+            flowFiles.stream().forEach(ff -> session.putAttribute(ff, 
"merge.uuid", merged.getAttribute(CoreAttributes.UUID.key())));
 
             session.getProvenanceReporter().join(flowFiles, merged, "Records 
Merged due to: " + completionReason);
             session.transfer(merged, MergeRecord.REL_MERGED);

http://git-wip-us.apache.org/repos/asf/nifi/blob/09f92600/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 5ee4dd9..261f981 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
 import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -68,9 +69,12 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
 
-        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
         mff.assertAttributeEquals("record.count", "2");
         mff.assertContentEquals("header\nJohn,35\nJane,34\n");
+
+        
runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).stream().forEach(
+                ff -> 
assertEquals(mff.getAttribute(CoreAttributes.UUID.key()), 
ff.getAttribute(MergeRecord.MERGE_UUID_ATTRIBUTE)));
     }
 
 
@@ -96,7 +100,7 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
 
-        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
         assertEquals(1L, mffs.stream()
             .filter(ff -> "2".equals(ff.getAttribute("record.count")))
             .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new 
String(ff.toByteArray())))
@@ -159,7 +163,7 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
 
-        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
+        final List<MockFlowFile> mffs = 
runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
         assertEquals(1L, mffs.stream()
             .filter(ff -> "2".equals(ff.getAttribute("record.count")))
             .filter(ff -> "header\nJohn,35\nJane,34\n".equals(new 
String(ff.toByteArray())))

Reply via email to