Repository: nifi
Updated Branches:
  refs/heads/master 5d1a4f343 -> ee9bd9408


NIFI-2632: Added fragment attributes to SplitJson and SplitXml

Signed-off-by: Yolanda M. Davis <[email protected]>

This closes #919


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ee9bd940
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ee9bd940
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ee9bd940

Branch: refs/heads/master
Commit: ee9bd94082cb27694e94b6bd5907b52a641e86d9
Parents: 5d1a4f3
Author: Matt Burgess <[email protected]>
Authored: Tue Aug 23 09:28:49 2016 -0400
Committer: Yolanda M. Davis <[email protected]>
Committed: Tue Aug 23 16:30:57 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitJson.java     | 45 +++++++++----
 .../nifi/processors/standard/SplitXml.java      | 70 +++++++++++---------
 .../nifi/processors/standard/TestSplitJson.java | 19 +++++-
 .../nifi/processors/standard/TestSplitXml.java  | 19 +++++-
 4 files changed, 105 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ee9bd940/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
index 58b3adc..caa5a0b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -25,6 +23,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
@@ -33,18 +33,20 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.jayway.jsonpath.DocumentContext;
@@ -61,6 +63,16 @@ import com.jayway.jsonpath.PathNotFoundException;
         + "Each generated FlowFile is comprised of an element of the specified 
array and transferred to relationship 'split,' "
         + "with the original file transferred to the 'original' relationship. 
If the specified JsonPath is not found or "
         + "does not evaluate to an array element, the original file is routed 
to 'failure' and no files are generated.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "fragment.identifier",
+                description = "All split FlowFiles produced from the same 
parent FlowFile will have the same randomly generated UUID added for this 
attribute"),
+        @WritesAttribute(attribute = "fragment.index",
+                description = "A one-up number that indicates the ordering of 
the split FlowFiles that were created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count",
+                description = "The number of split FlowFiles generated from 
the parent FlowFile"),
+        @WritesAttribute(attribute = "segment.original.filename ", description 
= "The filename of the parent FlowFile")
+})
+
 public class SplitJson extends AbstractJsonPathProcessor {
 
     public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new 
PropertyDescriptor.Builder()
@@ -184,20 +196,29 @@ public class SplitJson extends AbstractJsonPathProcessor {
         }
 
         List resultList = (List) jsonPathResult;
+        AtomicInteger jsonLineCount = new AtomicInteger(0);
 
-        for (final Object resultSegment : resultList) {
+        final String fragmentIdentifier = UUID.randomUUID().toString();
+        for (int i = 0; i < resultList.size(); i++) {
+            Object resultSegment = resultList.get(i);
             FlowFile split = processSession.create(original);
-            split = processSession.write(split, new OutputStreamCallback() {
-                @Override
-                public void process(OutputStream out) throws IOException {
-                    String resultSegmentContent = 
getResultRepresentation(resultSegment, nullDefaultValue);
-                    
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
-                }
-            });
+            split = processSession.write(split, (out) -> {
+                        String resultSegmentContent = 
getResultRepresentation(resultSegment, nullDefaultValue);
+                        
out.write(resultSegmentContent.getBytes(StandardCharsets.UTF_8));
+                    }
+            );
+            split = processSession.putAttribute(split, "fragment.identifier", 
fragmentIdentifier);
+            split = processSession.putAttribute(split, "fragment.index", 
Integer.toString(i));
+            split = processSession.putAttribute(split, 
"segment.original.filename", split.getAttribute(CoreAttributes.FILENAME.key()));
             segments.add(split);
+            jsonLineCount.incrementAndGet();
         }
 
-        processSession.transfer(segments, REL_SPLIT);
+        segments.forEach((segment) -> {
+            segment = processSession.putAttribute(segment, "fragment.count", 
Integer.toString(jsonLineCount.get()));
+            processSession.transfer(segment, REL_SPLIT);
+        });
+
         processSession.transfer(original, REL_ORIGINAL);
         logger.info("Split {} into {} FlowFiles", new Object[]{original, 
segments.size()});
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ee9bd940/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
index 4764ea8..0f0032a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitXml.java
@@ -16,9 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -27,7 +25,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.parsers.SAXParser;
@@ -39,18 +39,19 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.XmlElementNotifier;
 import org.apache.nifi.stream.io.BufferedInputStream;
@@ -69,6 +70,15 @@ import org.xml.sax.XMLReader;
 @Tags({"xml", "split"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Splits an XML File into multiple separate FlowFiles, 
each comprising a child or descendant of the original root element")
+@WritesAttributes({
+        @WritesAttribute(attribute = "fragment.identifier",
+                description = "All split FlowFiles produced from the same 
parent FlowFile will have the same randomly generated UUID added for this 
attribute"),
+        @WritesAttribute(attribute = "fragment.index",
+                description = "A one-up number that indicates the ordering of 
the split FlowFiles that were created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count",
+                description = "The number of split FlowFiles generated from 
the parent FlowFile"),
+        @WritesAttribute(attribute = "segment.original.filename ", description 
= "The filename of the parent FlowFile")
+})
 public class SplitXml extends AbstractProcessor {
 
     public static final PropertyDescriptor SPLIT_DEPTH = new 
PropertyDescriptor.Builder()
@@ -146,35 +156,29 @@ public class SplitXml extends AbstractProcessor {
         final ComponentLog logger = getLogger();
 
         final List<FlowFile> splits = new ArrayList<>();
-        final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(new 
XmlElementNotifier() {
-            @Override
-            public void onXmlElementFound(final String xmlTree) {
-                FlowFile split = session.create(original);
-                split = session.write(split, new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        out.write(xmlTree.getBytes("UTF-8"));
-                    }
-                });
-                splits.add(split);
-            }
+        final String fragmentIdentifier = UUID.randomUUID().toString();
+        final AtomicInteger numberOfRecords = new AtomicInteger(0);
+        final XmlSplitterSaxParser parser = new XmlSplitterSaxParser(xmlTree 
-> {
+            FlowFile split = session.create(original);
+            split = session.write(split, out -> 
out.write(xmlTree.getBytes("UTF-8")));
+            split = session.putAttribute(split, "fragment.identifier", 
fragmentIdentifier);
+            split = session.putAttribute(split, "fragment.index", 
Integer.toString(numberOfRecords.getAndIncrement()));
+            split = session.putAttribute(split, "segment.original.filename", 
split.getAttribute(CoreAttributes.FILENAME.key()));
+            splits.add(split);
         }, depth);
 
         final AtomicBoolean failed = new AtomicBoolean(false);
-        session.read(original, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream rawIn) throws IOException {
-                try (final InputStream in = new BufferedInputStream(rawIn)) {
-                    SAXParser saxParser = null;
-                    try {
-                        saxParser = saxParserFactory.newSAXParser();
-                        final XMLReader reader = saxParser.getXMLReader();
-                        reader.setContentHandler(parser);
-                        reader.parse(new InputSource(in));
-                    } catch (final ParserConfigurationException | SAXException 
e) {
-                        logger.error("Unable to parse {} due to {}", new 
Object[]{original, e});
-                        failed.set(true);
-                    }
+        session.read(original, rawIn -> {
+            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                SAXParser saxParser = null;
+                try {
+                    saxParser = saxParserFactory.newSAXParser();
+                    final XMLReader reader = saxParser.getXMLReader();
+                    reader.setContentHandler(parser);
+                    reader.parse(new InputSource(in));
+                } catch (final ParserConfigurationException | SAXException e) {
+                    logger.error("Unable to parse {} due to {}", new 
Object[]{original, e});
+                    failed.set(true);
                 }
             }
         });
@@ -183,7 +187,11 @@ public class SplitXml extends AbstractProcessor {
             session.transfer(original, REL_FAILURE);
             session.remove(splits);
         } else {
-            session.transfer(splits, REL_SPLIT);
+            splits.forEach((split) -> {
+                split = session.putAttribute(split, "fragment.count", 
Integer.toString(numberOfRecords.get()));
+                session.transfer(split, REL_SPLIT);
+            });
+
             session.transfer(original, REL_ORIGINAL);
             logger.info("Split {} into {} FlowFiles", new Object[]{original, 
splits.size()});
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ee9bd940/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
index fc07386..4e0d999 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitJson.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.OutputStreamCallback;
@@ -32,6 +33,7 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.HashMap;
 
 public class TestSplitJson {
 
@@ -110,13 +112,26 @@ public class TestSplitJson {
         final TestRunner testRunner = TestRunners.newTestRunner(new 
SplitJson());
         testRunner.setProperty(SplitJson.ARRAY_JSON_PATH_EXPRESSION, 
"$[*].name");
 
-        testRunner.enqueue(JSON_SNIPPET);
+        testRunner.enqueue(JSON_SNIPPET, new HashMap<String, String>() {
+            {
+                put(CoreAttributes.FILENAME.key(), "test.json");
+            }
+        });
         testRunner.run();
 
         testRunner.assertTransferCount(SplitJson.REL_ORIGINAL, 1);
         testRunner.assertTransferCount(SplitJson.REL_SPLIT, 7);
         
testRunner.getFlowFilesForRelationship(SplitJson.REL_ORIGINAL).get(0).assertContentEquals(JSON_SNIPPET);
-        
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0).assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
+        MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(0);
+        
flowFile.assertContentEquals("{\"first\":\"Shaffer\",\"last\":\"Pearson\"}");
+        flowFile.assertAttributeEquals("fragment.count", "7");
+        flowFile.assertAttributeEquals("fragment.index", "0");
+        flowFile.assertAttributeEquals("segment.original.filename", 
"test.json");
+
+        flowFile = 
testRunner.getFlowFilesForRelationship(SplitJson.REL_SPLIT).get(6);
+        flowFile.assertAttributeEquals("fragment.count", "7");
+        flowFile.assertAttributeEquals("fragment.index", "6");
+        flowFile.assertAttributeEquals("segment.original.filename", 
"test.json");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/ee9bd940/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
index 1815b3b..2157ab8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitXml.java
@@ -19,11 +19,14 @@ package org.apache.nifi.processors.standard;
 import java.io.IOException;
 import java.io.StringReader;
 import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 import javax.xml.parsers.SAXParser;
 import javax.xml.parsers.SAXParserFactory;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -57,13 +60,23 @@ public class TestSplitXml {
     @Test
     public void testDepthOf1() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new SplitXml());
-        runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
+        runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"), 
new HashMap<String, String>() {
+            {
+                put(CoreAttributes.FILENAME.key(), "test.xml");
+            }
+        });
         runner.run();
         runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitXml.REL_SPLIT, 6);
 
         
parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_ORIGINAL));
         parseFlowFiles(runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT));
+        Arrays.asList(0, 1, 2, 3, 4, 5).forEach((index) -> {
+            final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(SplitXml.REL_SPLIT).get(index);
+            flowFile.assertAttributeEquals("fragment.index", 
Integer.toString(index));
+            flowFile.assertAttributeEquals("fragment.count", "6");
+            flowFile.assertAttributeEquals("segment.original.filename", 
"test.xml");
+        });
     }
 
     @Test
@@ -82,7 +95,7 @@ public class TestSplitXml {
     @Test
     public void testDepthOf3() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new SplitXml());
-        runner.setProperty(SplitXml.SPLIT_DEPTH, "2");
+        runner.setProperty(SplitXml.SPLIT_DEPTH, "3");
         runner.enqueue(Paths.get("src/test/resources/TestXml/xml-bundle-1"));
         runner.run();
         runner.assertTransferCount(SplitXml.REL_ORIGINAL, 1);
@@ -98,7 +111,7 @@ public class TestSplitXml {
         // declarations are handled correctly.
         factory = SAXParserFactory.newInstance();
         factory.setNamespaceAware(true);
-        saxParser = factory.newSAXParser( );
+        saxParser = factory.newSAXParser();
 
         final TestRunner runner = TestRunners.newTestRunner(new SplitXml());
         runner.setProperty(SplitXml.SPLIT_DEPTH, "3");

Reply via email to