This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new ce09b93  NIFI-6139: Add fragment attributes to PartitionRecord
ce09b93 is described below

commit ce09b93ef1d104d65968054b97c2c9b2c5eaef3e
Author: Matthew Burgess <[email protected]>
AuthorDate: Thu Mar 21 14:53:07 2019 -0400

    NIFI-6139: Add fragment attributes to PartitionRecord
    
    This closes #3382.
    
    Signed-off-by: Koji Kawamura <[email protected]>
---
 .../apache/nifi/processors/standard/PartitionRecord.java   | 14 ++++++++++++++
 .../nifi/processors/standard/TestPartitionRecord.java      |  4 ++++
 2 files changed, 18 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
index 43e1e4b..16209c8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -48,6 +49,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -84,6 +86,11 @@ import 
org.apache.nifi.serialization.record.util.DataTypeUtils;
 @WritesAttributes({
     @WritesAttribute(attribute="record.count", description="The number of 
records in an outgoing FlowFile"),
     @WritesAttribute(attribute="mime.type", description="The MIME Type that 
the configured Record Writer indicates is appropriate"),
+    @WritesAttribute(attribute = "fragment.identifier", description = "All 
partitioned FlowFiles produced from the same parent FlowFile will have the same 
randomly "
+            + "generated UUID added for this attribute"),
+    @WritesAttribute(attribute = "fragment.index", description = "A one-up 
number that indicates the ordering of the partitioned FlowFiles that were 
created from a single parent FlowFile"),
+    @WritesAttribute(attribute = "fragment.count", description = "The number 
of partitioned FlowFiles generated from the parent FlowFile"),
+    @WritesAttribute(attribute = "segment.original.filename ", description = 
"The filename of the parent FlowFile"),
     @WritesAttribute(attribute="<dynamic property name>",
         description = "For each dynamic property that is added, an attribute 
may be added to the FlowFile. See the description for Dynamic Properties for 
more information.")
 })
@@ -232,6 +239,8 @@ public class PartitionRecord extends AbstractProcessor {
             }
 
             // For each RecordSetWriter, finish the record set and close the 
writer.
+            int fragmentIndex = 0;
+            final String fragmentId = UUID.randomUUID().toString();
             for (final Map.Entry<RecordValueMap, RecordSetWriter> entry : 
writerMap.entrySet()) {
                 final RecordValueMap valueMap = entry.getKey();
                 final RecordSetWriter writer = entry.getValue();
@@ -244,11 +253,16 @@ public class PartitionRecord extends AbstractProcessor {
                 attributes.putAll(writeResult.getAttributes());
                 attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
                 attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                attributes.put(FragmentAttributes.FRAGMENT_INDEX.key(), 
String.valueOf(fragmentIndex));
+                attributes.put(FragmentAttributes.FRAGMENT_ID.key(), 
fragmentId);
+                attributes.put(FragmentAttributes.FRAGMENT_COUNT.key(), 
String.valueOf(writerMap.size()));
+                
attributes.put(FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(), 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
 
                 FlowFile childFlowFile = valueMap.getFlowFile();
                 childFlowFile = session.putAllAttributes(childFlowFile, 
attributes);
 
                 session.adjustCounter("Record Processed", 
writeResult.getRecordCount(), false);
+                fragmentIndex++;
             }
 
         } catch (final Exception e) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
index 99e6370..32546c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.stream.IntStream;
 
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -80,6 +81,9 @@ public class TestPartitionRecord {
 
         assertEquals(3L, out.stream().filter(ff -> 
ff.getAttribute("record.count").equals("1")).count());
         assertEquals(1L, out.stream().filter(ff -> 
ff.getAttribute("record.count").equals("2")).count());
+        out.forEach(ff -> ff.assertAttributeEquals("fragment.count", "4"));
+        IntStream.of(1, 3).forEach((i) -> 
out.get(i).assertAttributeEquals("fragment.id", 
out.get(0).getAttribute("fragment.id")));
+        IntStream.of(0, 3).forEach((i) -> 
out.get(i).assertAttributeEquals("fragment.index", String.valueOf(i)));
 
         out.stream().filter(ff -> 
ff.getAttribute("record.count").equals("2")).forEach(ff -> 
ff.assertContentEquals("Jake,49,\nJake,14,\n"));
 

Reply via email to