Repository: nifi Updated Branches: refs/heads/master c056ede6c -> 8b2c5b724
NIFI-4950 Defining behavior for MergeContent when more than 1 FlowFile has the same fragment.index value Signed-off-by: Mike Moser <[email protected]> This closes #2557. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8b2c5b72 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8b2c5b72 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8b2c5b72 Branch: refs/heads/master Commit: 8b2c5b72469dd9715c532164f288acae835a0fd6 Parents: c056ede Author: Mark Bean <[email protected]> Authored: Thu Mar 15 19:06:00 2018 -0400 Committer: Mike Moser <[email protected]> Committed: Fri Mar 16 18:31:43 2018 +0000 ---------------------------------------------------------------------- .../org/apache/nifi/processor/util/bin/Bin.java | 19 ++++++++++ .../nifi/processors/standard/MergeContent.java | 3 +- .../processors/standard/TestMergeContent.java | 37 ++++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8b2c5b72/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java index fdbc71f..b427e06 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java @@ -18,18 +18,27 @@ package org.apache.nifi.processor.util.bin; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.ProcessSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a {@code Bin}, the caller must synchronize * access. */ public class Bin { + private static final Logger logger = LoggerFactory.getLogger(Bin.class); + + public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key(); + private final ProcessSession session; private final long creationMomentEpochNs; private final long minimumSizeBytes; @@ -40,6 +49,7 @@ public class Bin { private final String fileCountAttribute; final List<FlowFile> binContents = new ArrayList<>(); + private final Set<String> binIndexSet = new HashSet<>(); long size; int successiveFailedOfferings = 0; @@ -127,6 +137,7 @@ public class Bin { return false; } + // fileCountAttribute is non-null for defragment mode if (fileCountAttribute != null) { final String countValue = flowFile.getAttribute(fileCountAttribute); final Integer count = toInteger(countValue); @@ -135,6 +146,14 @@ public class Bin { this.maximumEntries = Math.min(count, currentMaxEntries); this.minimumEntries = currentMaxEntries; } + + final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE); + if (index == null || index.isEmpty() || !binIndexSet.add(index)) { + // Do not accept flowfile with duplicate fragment index value + logger.warn("Duplicate or missing value for '" + FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in Bin", new Object[] { flowFile }); + successiveFailedOfferings++; + return false; + } } size += flowFile.getSize(); http://git-wip-us.apache.org/repos/asf/nifi/blob/8b2c5b72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 803b65d..cc9f6f7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -110,7 +110,8 @@ import org.apache.nifi.util.FlowFilePackagerV3; + "attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all " + "FlowFiles that have the same value for the \"fragment.identifier\" attribute) integer " + "between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the " - + "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the behavior of this Processor is undefined."), + + "\"fragment.identifier\" attribute and the same value for the \"fragment.index\" attribute, the first FlowFile processed will be " + + "accepted and subsequent FlowFiles will not be accepted into the Bin."), @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This " + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " http://git-wip-us.apache.org/repos/asf/nifi/blob/8b2c5b72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index 3d8e015..af448d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -852,6 +852,41 @@ public class TestMergeContent { } @Test + public void testDefragmentDuplicateFragement() throws IOException, InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1"); + attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4"); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1"); + + runner.enqueue("A Man ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2"); + runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); + // enqueue a duplicate fragment + runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3"); + runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4"); + runner.enqueue("Panama".getBytes("UTF-8"), attributes); + + runner.run(1, false); + + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + + runner.clearTransferState(); + Thread.sleep(1_100L); + runner.run(); + runner.assertTransferCount(MergeContent.REL_FAILURE, 1); + runner.assertTransferCount(MergeContent.REL_MERGED, 0); + } + + @Test public void testDefragmentWithTooFewFragments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); @@ -1182,6 +1217,8 @@ public class TestMergeContent { private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException { final Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text"); + // add 'fragment.index' attribute to ensure non-defragment mode operates correctly even when index is present + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1"); testRunner.enqueue("Hello".getBytes("UTF-8"), attributes); testRunner.enqueue(", ".getBytes("UTF-8"), attributes);
