Repository: nifi Updated Branches: refs/heads/master a6e94de0b -> 844dbe4ed
NIFI-4156: Fixed fragment.count in SplitText to equal emitted flow files Signed-off-by: Pierre Villard <[email protected]> This closes #2014. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/844dbe4e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/844dbe4e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/844dbe4e Branch: refs/heads/master Commit: 844dbe4edbc1cc0d24abe394cd276b9925627240 Parents: a6e94de Author: Matt Burgess <[email protected]> Authored: Mon Jul 17 09:23:55 2017 -0400 Committer: Pierre Villard <[email protected]> Committed: Tue Jul 18 11:31:16 2017 +0200 ---------------------------------------------------------------------- .../nifi/processors/standard/SplitText.java | 14 ++++++--- .../nifi/processors/standard/TestSplitText.java | 33 +++++++++++++++++++- 2 files changed, 42 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/844dbe4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index 5738632..95accf4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -298,7 +298,7 @@ public class SplitText extends AbstractProcessor { if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) { FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); splitFlowFile = this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(), - fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + fragmentId, fragmentIndex++, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } else { for (SplitInfo computedSplitInfo : computedSplitsInfo) { @@ -318,10 +318,14 @@ public class SplitText extends AbstractProcessor { } splitFlowFile = this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++, - computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } } + // Update fragment.count with real split count (i.e. don't count files for which there was no clone) + for (FlowFile splitFlowFile : splitFlowFiles) { + splitFlowFile = processSession.putAttribute(splitFlowFile, FRAGMENT_COUNT, String.valueOf(fragmentIndex - 1)); // -1 because the index starts at 1 (see above) + } } getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : ".")); @@ -356,13 +360,12 @@ public class SplitText extends AbstractProcessor { } private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize, - String splitId, int splitIndex, int splitCount, String origFileName) { + String splitId, int splitIndex, String origFileName) { Map<String, String> attributes = new HashMap<>(); attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount)); attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize())); attributes.put(FRAGMENT_ID, splitId); attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex)); - attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount)); attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName); return processSession.putAllAttributes(splitFlowFile, attributes); } @@ -424,6 +427,7 @@ public class SplitText extends AbstractProcessor { SplitInfo remainderSplitInfo, long startingLength) throws IOException { long length = 0; long trailingCrlfLength = 0; + long trailingLineCount = 0; long actualLineCount = 0; OffsetInfo offsetInfo = null; SplitInfo splitInfo = null; @@ -440,6 +444,7 @@ public class SplitText extends AbstractProcessor { if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) { trailingCrlfLength += offsetInfo.getCrlfLength(); + trailingLineCount++; } else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) { trailingCrlfLength = 0; // non-empty line came in, thus resetting counter } @@ -465,6 +470,7 @@ public class SplitText extends AbstractProcessor { if (length - trailingCrlfLength >= lastCrlfLength) { trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line } + actualLineCount -= trailingLineCount; splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo); } return splitInfo; http://git-wip-us.apache.org/repos/asf/nifi/blob/844dbe4e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java index 8e4c881..f42b1d3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java @@ -771,7 +771,38 @@ public class TestSplitText { splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); - splits.get(0).assertContentEquals("1\n2"); + MockFlowFile split0 = splits.get(0); + split0.assertContentEquals("1\n2"); + split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1"); + split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "1"); + split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "2"); + } + + @Test + public void testFragmentCountIsActualFlowFileCount() { + final TestRunner splitRunner = TestRunners.newTestRunner(new SplitText()); + splitRunner.setProperty(SplitText.HEADER_LINE_COUNT, "0"); + splitRunner.setProperty(SplitText.LINE_SPLIT_COUNT, "1"); + splitRunner.setProperty(SplitText.REMOVE_TRAILING_NEWLINES, "true"); + + splitRunner.enqueue("1\n2\n\n\n\n\n\n\n\n"); + + splitRunner.run(); + splitRunner.assertTransferCount(SplitText.REL_SPLITS, 2); + splitRunner.assertTransferCount(SplitText.REL_ORIGINAL, 1); + splitRunner.assertTransferCount(SplitText.REL_FAILURE, 0); + + final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS); + MockFlowFile split0 = splits.get(0); + split0.assertContentEquals("1"); + split0.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "1"); + split0.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); + split0.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1"); + MockFlowFile split1 = splits.get(1); + split1.assertContentEquals("2"); + split1.assertAttributeEquals(SplitText.FRAGMENT_INDEX, "2"); + split1.assertAttributeEquals(SplitText.FRAGMENT_COUNT, "2"); + split1.assertAttributeEquals(SplitText.SPLIT_LINE_COUNT, "1"); }
