NIFI-379 fixed what appears to be a faulty edge condition handling

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/819b65f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/819b65f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/819b65f7

Branch: refs/heads/NIFI-632
Commit: 819b65f7e08ff5a836bd7c9fd29826539344abe9
Parents: b53948a
Author: joewitt <[email protected]>
Authored: Thu Jun 4 22:48:17 2015 -0400
Committer: joewitt <[email protected]>
Committed: Thu Jun 4 22:48:38 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 23 ++++++++++----------
 .../processors/standard/TestMergeContent.java   | 20 +++++++++++++++++
 2 files changed, 31 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/819b65f7/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
index 399a12b..a7b4b28 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -171,17 +171,17 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
 
     @Override
     public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
-        int binsAdded = binFlowFiles(context, sessionFactory);
-        getLogger().debug("Binned {} FlowFiles", new Object[]{binsAdded});
+        final int flowFilesBinned = binFlowFiles(context, sessionFactory);
+        getLogger().debug("Binned {} FlowFiles", new 
Object[]{flowFilesBinned});
 
         if (!isScheduled()) {
             return;
         }
 
-        binsAdded += migrateBins(context);
-
+        final int binsMigrated = migrateBins(context);
         final int binsProcessed = processBins(context, sessionFactory);
-        if (binsProcessed == 0 && binsAdded == 0) {
+        //If we accomplished nothing then let's yield
+        if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) {
             context.yield();
         }
     }
@@ -203,7 +203,6 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
                 this.readyBins.add(bin);
             }
         }
-
         return added;
     }
 
@@ -251,16 +250,16 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
     }
 
     private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
-        int binsAdded = 0;
-        while (binManager.getBinCount() < 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+        int flowFilesBinned = 0;
+        while (binManager.getBinCount() <= 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
             if (!isScheduled()) {
-                return binsAdded;
+                break;
             }
 
             final ProcessSession session = sessionFactory.createSession();
             FlowFile flowFile = session.get();
             if (flowFile == null) {
-                return binsAdded;
+                break;
             }
 
             flowFile = this.preprocessFlowFile(context, session, flowFile);
@@ -276,10 +275,10 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
                 this.readyBins.add(bin);
             }
 
-            binsAdded++;
+            flowFilesBinned++;
         }
 
-        return binsAdded;
+        return flowFilesBinned;
     }
 
     @OnScheduled

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/819b65f7/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index aad2593..65925f7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -73,6 +73,26 @@ public class TestMergeContent {
         bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
         bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
     }
+    
+    @Test
+    public void testSimpleBinaryConcatSingleBin() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_CONCAT);
+        runner.setProperty(MergeContent.MAX_BIN_COUNT, "1");
+
+        createFlowFiles(runner);
+        runner.run();
+
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
+    }    
 
     @Test
     public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, 
InterruptedException {

Reply via email to