Repository: nifi
Updated Branches:
  refs/heads/master a6e94de0b -> 844dbe4ed


NIFI-4156: Fixed fragment.count in SplitText to equal emitted flow files

Signed-off-by: Pierre Villard <[email protected]>

This closes #2014.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/844dbe4e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/844dbe4e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/844dbe4e

Branch: refs/heads/master
Commit: 844dbe4edbc1cc0d24abe394cd276b9925627240
Parents: a6e94de
Author: Matt Burgess <[email protected]>
Authored: Mon Jul 17 09:23:55 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Tue Jul 18 11:31:16 2017 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitText.java     | 14 ++++++---
 .../nifi/processors/standard/TestSplitText.java | 33 +++++++++++++++++++-
 2 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/844dbe4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index 5738632..95accf4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -298,7 +298,7 @@ public class SplitText extends AbstractProcessor {
         if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
             FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, 
headerFlowFile.getSize() - headerCrlfLength);
             splitFlowFile = this.updateAttributes(processSession, 
splitFlowFile, 0, splitFlowFile.getSize(),
-                    fragmentId, fragmentIndex++, 0, 
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+                    fragmentId, fragmentIndex++, 
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
             splitFlowFiles.add(splitFlowFile);
         } else {
             for (SplitInfo computedSplitInfo : computedSplitsInfo) {
@@ -318,10 +318,14 @@ public class SplitText extends AbstractProcessor {
                     }
 
                     splitFlowFile = this.updateAttributes(processSession, 
splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), 
fragmentId, fragmentIndex++,
-                            computedSplitsInfo.size(), 
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+                            
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
                     splitFlowFiles.add(splitFlowFile);
                 }
             }
+            // Update fragment.count with real split count (i.e. don't count 
files for which there was no clone)
+            for (FlowFile splitFlowFile : splitFlowFiles) {
+                splitFlowFile = processSession.putAttribute(splitFlowFile, 
FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index 
starts at 1 (see above)
+            }
         }
 
         getLogger().info("Split " + sourceFlowFile + " into " + 
splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing 
headers." : "."));
@@ -356,13 +360,12 @@ public class SplitText extends AbstractProcessor {
     }
 
     private FlowFile updateAttributes(ProcessSession processSession, FlowFile 
splitFlowFile, long splitLineCount, long splitFlowFileSize,
-            String splitId, int splitIndex, int splitCount, String 
origFileName) {
+            String splitId, int splitIndex, String origFileName) {
         Map<String, String> attributes = new HashMap<>();
         attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount));
         attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize()));
         attributes.put(FRAGMENT_ID, splitId);
         attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex));
-        attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount));
         attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName);
         return processSession.putAllAttributes(splitFlowFile, attributes);
     }
@@ -424,6 +427,7 @@ public class SplitText extends AbstractProcessor {
             SplitInfo remainderSplitInfo, long startingLength) throws 
IOException {
         long length = 0;
         long trailingCrlfLength = 0;
+        long trailingLineCount = 0;
         long actualLineCount = 0;
         OffsetInfo offsetInfo = null;
         SplitInfo splitInfo = null;
@@ -440,6 +444,7 @@ public class SplitText extends AbstractProcessor {
 
             if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) {
                 trailingCrlfLength += offsetInfo.getCrlfLength();
+                trailingLineCount++;
             } else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) {
                 trailingCrlfLength = 0; // non-empty line came in, thus 
resetting counter
             }
@@ -465,6 +470,7 @@ public class SplitText extends AbstractProcessor {
             if (length - trailingCrlfLength >= lastCrlfLength) {
                 trailingCrlfLength += lastCrlfLength; // trim CRLF from the 
last line
             }
+            actualLineCount -= trailingLineCount;
             splitInfo = new SplitInfo(startOffset, length, length - 
trailingCrlfLength, actualLineCount, remaningOffsetInfo);
         }
         return splitInfo;

http://git-wip-us.apache.org/repos/asf/nifi/blob/844dbe4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
index 8e4c881..f42b1d3 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
@@ -771,7 +771,38 @@ public class TestSplitText {
         splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
 
         final List<MockFlowFile> splits = 
splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
-        splits.get(0).assertContentEquals("1\n2");
+        MockFlowFile split0 = splits.get(0);
+        split0.assertContentEquals("1\n2");
+        split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1");
+        split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "1");
+        split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "2");
+    }
+
+    @Test
+    public void testFragmentCountIsActualFlowFileCount() {
+        final TestRunner splitRunner = TestRunners.newTestRunner(new 
SplitText());
+        splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0");
+        splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1");
+        splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true");
+
+        splitRunner.enqueue("1\n2\n\n\n\n\n\n\n\n");
+
+        splitRunner.run();
+        splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2);
+        splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
+        splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0);
+
+        final List<MockFlowFile> splits = 
splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
+        MockFlowFile split0 = splits.get(0);
+        split0.assertContentEquals("1");
+        split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1");
+        split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
+        split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
+        MockFlowFile split1 = splits.get(1);
+        split1.assertContentEquals("2");
+        split1.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "2");
+        split1.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2");
+        split1.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1");
     }
 
 

Reply via email to