NIFI-4828: Fix MergeContent to process all ready bins

Before this fix, MergeContent only processed the first bin even if there
were multiple bins.

There were two unit tests marked with Ignore those had been
failing because of this.

This closes #2444.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/63098baa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/63098baa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/63098baa

Branch: refs/heads/HDF-3.1-maint
Commit: 63098baa11f4048469413e15d3e7bdf27857b369
Parents: 678dfc1
Author: Koji Kawamura <[email protected]>
Authored: Tue Jan 30 15:21:00 2018 +0900
Committer: Mark Payne <[email protected]>
Committed: Wed Feb 7 15:50:33 2018 -0500

----------------------------------------------------------------------
 .../nifi/processor/util/bin/BinFiles.java       | 60 ++++++++++----------
 .../nifi/processor/util/bin/BinManager.java     | 47 ++++++---------
 .../processors/standard/TestMergeContent.java   |  7 +--
 3 files changed, 47 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/63098baa/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 7f79b70..643aae4 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -217,48 +217,45 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
     }
 
     private int processBins(final ProcessContext context) {
-        final Bin bin = readyBins.poll();
-        if (bin == null) {
-            return 0;
-        }
-
-        final List<Bin> bins = new ArrayList<>();
-        bins.add(bin);
-
         final ComponentLog logger = getLogger();
+        int processedBins = 0;
+        Bin bin;
+        while ((bin = readyBins.poll()) != null) {
+            boolean binAlreadyCommitted;
+            try {
+                binAlreadyCommitted = this.processBin(bin, context);
+            } catch (final ProcessException e) {
+                logger.error("Failed to process bundle of {} files due to {}", 
new Object[] {bin.getContents().size(), e});
+
+                final ProcessSession binSession = bin.getSession();
+                for (final FlowFile flowFile : bin.getContents()) {
+                    binSession.transfer(flowFile, REL_FAILURE);
+                }
+                binSession.commit();
+                continue;
+            } catch (final Exception e) {
+                logger.error("Failed to process bundle of {} files due to {}; 
rolling back sessions", new Object[] {bin.getContents().size(), e});
 
-        boolean binAlreadyCommitted = false;
-        try {
-            binAlreadyCommitted = this.processBin(bin, context);
-        } catch (final ProcessException e) {
-            logger.error("Failed to process bundle of {} files due to {}", new 
Object[] {bin.getContents().size(), e});
-
-            final ProcessSession binSession = bin.getSession();
-            for (final FlowFile flowFile : bin.getContents()) {
-                binSession.transfer(flowFile, REL_FAILURE);
+                bin.getSession().rollback();
+                continue;
             }
-            binSession.commit();
-            return 1;
-        } catch (final Exception e) {
-            logger.error("Failed to process bundle of {} files due to {}; 
rolling back sessions", new Object[] {bin.getContents().size(), e});
 
-            bin.getSession().rollback();
-            return 1;
-        }
+            // If this bin's session has been committed, move on.
+            if (!binAlreadyCommitted) {
+                final ProcessSession binSession = bin.getSession();
+                binSession.transfer(bin.getContents(), REL_ORIGINAL);
+                binSession.commit();
+            }
 
-        // If this bin's session has been committed, move on.
-        if (!binAlreadyCommitted) {
-            final ProcessSession binSession = bin.getSession();
-            binSession.transfer(bin.getContents(), REL_ORIGINAL);
-            binSession.commit();
+            processedBins++;
         }
 
-        return 1;
+        return processedBins;
     }
 
     private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
         int flowFilesBinned = 0;
-        while (binManager.getBinCount() <= 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+        while (binManager.getBinCount() <= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
             if (!isScheduled()) {
                 break;
             }
@@ -290,6 +287,7 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
                     bin.offer(flowFile, session);
                     this.readyBins.add(bin);
                 }
+                flowFilesBinned += entry.getValue().size();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/63098baa/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
index d6a8567..e6cec78 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java
@@ -175,40 +175,25 @@ public class BinManager {
                     continue;
                 }
 
-                final List<Bin> currentBins = groupBinMap.get(groupIdentifier);
-                if (currentBins == null) { // this is a new group we need to 
register
-                    final List<Bin> bins = new ArrayList<>();
-                    final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                        maxEntries.get(), fileCountAttribute.get());
-                    bins.add(bin);
-                    groupBinMap.put(groupIdentifier, bins);
-                    binCount++;
-
-                    final boolean added = bin.offer(flowFile, session);
-                    if (!added) {
-                        unbinned.add(flowFile);
-                    }
-                    continue;
-                } else {
-                    for (final Bin bin : currentBins) {
-                        final boolean accepted = bin.offer(flowFile, session);
-                        if (accepted) {
-                            continue flowFileLoop;
-                        }
-                    }
-
-                    //if we've reached this point then we couldn't fit it into 
any existing bins - gotta make a new one
-                    final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
-                        maxEntries.get(), fileCountAttribute.get());
-                    currentBins.add(bin);
-                    binCount++;
-                    final boolean added = bin.offer(flowFile, session);
-                    if (!added) {
-                        unbinned.add(flowFile);
+                final List<Bin> currentBins = 
groupBinMap.computeIfAbsent(groupIdentifier, k -> new ArrayList<>());
+                for (final Bin bin : currentBins) {
+                    final boolean accepted = bin.offer(flowFile, session);
+                    if (accepted) {
+                        continue flowFileLoop;
                     }
+                }
 
-                    continue;
+                // if we've reached this point then the groupIdentifier was a 
brand new one,
+                // or we couldn't fit it into any existing bins - gotta make a 
new one
+                final Bin bin = new Bin(sessionFactory.createSession(), 
minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(),
+                    maxEntries.get(), fileCountAttribute.get());
+                currentBins.add(bin);
+                binCount++;
+                final boolean added = bin.offer(flowFile, session);
+                if (!added) {
+                    unbinned.add(flowFile);
                 }
+
             }
         } finally {
             wLock.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/63098baa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index d461712..3d8e015 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -57,7 +57,6 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestMergeContent {
@@ -911,7 +910,6 @@ public class TestMergeContent {
         assembled.assertContentEquals("A Man A Plan A Canal 
Panama".getBytes("UTF-8"));
     }
 
-    @Ignore("this test appears to be faulty")
     @Test
     public void testDefragmentMultipleMingledSegments() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
@@ -941,7 +939,7 @@ public class TestMergeContent {
         attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
         runner.enqueue("Panama".getBytes("UTF-8"), attributes);
 
-        runner.run(2);
+        runner.run(1);
 
         runner.assertTransferCount(MergeContent.REL_MERGED, 2);
         final MockFlowFile assembled = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
@@ -1007,7 +1005,6 @@ public class TestMergeContent {
         assembled.assertContentEquals("A Man A Plan A Canal 
Panama".getBytes("UTF-8"));
     }
 
-    @Ignore("This test appears to be a fail...is retuning 1 instead of 
2...needs work")
     @Test
     public void testMergeBasedOnCorrelation() throws IOException, 
InterruptedException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
@@ -1028,7 +1025,7 @@ public class TestMergeContent {
         attributes.put("attr", "b");
         runner.enqueue("Panama".getBytes("UTF-8"), attributes);
 
-        runner.run(2);
+        runner.run(1);
 
         runner.assertTransferCount(MergeContent.REL_MERGED, 2);
 

Reply via email to