Repository: nifi
Updated Branches:
  refs/heads/master c056ede6c -> 8b2c5b724


NIFI-4950 Defining behavior for MergeContent when more than 1 FlowFile has the 
same fragment.index value

Signed-off-by: Mike Moser <[email protected]>

This closes #2557.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8b2c5b72
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8b2c5b72
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8b2c5b72

Branch: refs/heads/master
Commit: 8b2c5b72469dd9715c532164f288acae835a0fd6
Parents: c056ede
Author: Mark Bean <[email protected]>
Authored: Thu Mar 15 19:06:00 2018 -0400
Committer: Mike Moser <[email protected]>
Committed: Fri Mar 16 18:31:43 2018 +0000

----------------------------------------------------------------------
 .../org/apache/nifi/processor/util/bin/Bin.java | 19 ++++++++++
 .../nifi/processors/standard/MergeContent.java  |  3 +-
 .../processors/standard/TestMergeContent.java   | 37 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8b2c5b72/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
index fdbc71f..b427e06 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java
@@ -18,18 +18,27 @@ package org.apache.nifi.processor.util.bin;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.ProcessSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a 
{@code Bin}, the caller must synchronize
  * access.
  */
 public class Bin {
+    private static final Logger logger = LoggerFactory.getLogger(Bin.class);
+
+    public static final String FRAGMENT_INDEX_ATTRIBUTE = 
FragmentAttributes.FRAGMENT_INDEX.key();
+
     private final ProcessSession session;
     private final long creationMomentEpochNs;
     private final long minimumSizeBytes;
@@ -40,6 +49,7 @@ public class Bin {
     private final String fileCountAttribute;
 
     final List<FlowFile> binContents = new ArrayList<>();
+    private final Set<String> binIndexSet = new HashSet<>();
     long size;
     int successiveFailedOfferings = 0;
 
@@ -127,6 +137,7 @@ public class Bin {
             return false;
         }
 
+        // fileCountAttribute is non-null for defragment mode
         if (fileCountAttribute != null) {
             final String countValue = 
flowFile.getAttribute(fileCountAttribute);
             final Integer count = toInteger(countValue);
@@ -135,6 +146,14 @@ public class Bin {
                 this.maximumEntries = Math.min(count, currentMaxEntries);
                 this.minimumEntries = currentMaxEntries;
             }
+
+            final String index = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
+            if (index == null || index.isEmpty() || !binIndexSet.add(index)) {
+                // Do not accept flowfile with duplicate fragment index value
+                logger.warn("Duplicate or missing value for '" + 
FRAGMENT_INDEX_ATTRIBUTE + "' in defragment mode. Flowfile {} not allowed in 
Bin", new Object[] { flowFile });
+                successiveFailedOfferings++;
+                return false;
+            }
         }
 
         size += flowFile.getSize();

http://git-wip-us.apache.org/repos/asf/nifi/blob/8b2c5b72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 803b65d..cc9f6f7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -110,7 +110,8 @@ import org.apache.nifi.util.FlowFilePackagerV3;
         + "attribute must be present on all FlowFiles when using the 
Defragment Merge Strategy and must be a unique (i.e., unique across all "
         + "FlowFiles that have the same value for the \"fragment.identifier\" 
attribute) integer "
         + "between 0 and the value of the fragment.count attribute. If two or 
more FlowFiles have the same value for the "
-        + "\"fragment.identifier\" attribute and the same value for the 
\"fragment.index\" attribute, the behavior of this Processor is undefined."),
+        + "\"fragment.identifier\" attribute and the same value for the 
\"fragment.index\" attribute, the first FlowFile processed will be "
+        + "accepted and subsequent FlowFiles will not be accepted into the 
Bin."),
     @ReadsAttribute(attribute = "fragment.count", description = "Applicable 
only if the <Merge Strategy> property is set to Defragment. This "
         + "attribute must be present on all FlowFiles with the same value for 
the fragment.identifier attribute. All FlowFiles in the same "
         + "bundle must have the same value for this attribute. The value of 
this attribute indicates how many FlowFiles should be expected "

http://git-wip-us.apache.org/repos/asf/nifi/blob/8b2c5b72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 3d8e015..af448d6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -852,6 +852,41 @@ public class TestMergeContent {
     }
 
     @Test
+    public void testDefragmentDuplicateFragement() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
+        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
+
+        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
+        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+        // enqueue a duplicate fragment
+        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
+        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
+        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
+
+        runner.run(1, false);
+
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        assembled.assertContentEquals("A Man A Plan A Canal 
Panama".getBytes("UTF-8"));
+
+        runner.clearTransferState();
+        Thread.sleep(1_100L);
+        runner.run();
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
+        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
+    }
+
+    @Test
     public void testDefragmentWithTooFewFragments() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MERGE_STRATEGY, 
MergeContent.MERGE_STRATEGY_DEFRAGMENT);
@@ -1182,6 +1217,8 @@ public class TestMergeContent {
     private void createFlowFiles(final TestRunner testRunner) throws 
UnsupportedEncodingException {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
+        // add 'fragment.index' attribute to ensure non-defragment mode 
operates correctly even when index is present
+        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
 
         testRunner.enqueue("Hello".getBytes("UTF-8"), attributes);
         testRunner.enqueue(", ".getBytes("UTF-8"), attributes);

Reply via email to