Repository: nifi
Updated Branches:
  refs/heads/master 423b333b7 -> f120952ab


NIFI-1669: Ensure that we take into account the number of 'ready bins' when 
determining whether or not we have reached out Max Bin threshold for BinFiles

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f120952a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f120952a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f120952a

Branch: refs/heads/master
Commit: f120952ab79d844b92c9d7e1014068065250635f
Parents: 423b333
Author: Mark Payne <[email protected]>
Authored: Tue Mar 22 18:30:37 2016 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Tue Mar 22 20:49:39 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processor/util/bin/BinFiles.java    | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f120952a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 7c90342..46891a4 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -169,8 +169,18 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
 
     @Override
     public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
-        final int flowFilesBinned = binFlowFiles(context, sessionFactory);
-        getLogger().debug("Binned {} FlowFiles", new 
Object[]{flowFilesBinned});
+        final int totalBinCount = binManager.getBinCount() + readyBins.size();
+        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+        final int flowFilesBinned;
+
+        if (totalBinCount < maxBinCount) {
+            flowFilesBinned = binFlowFiles(context, sessionFactory);
+            getLogger().debug("Binned {} FlowFiles", new Object[] 
{flowFilesBinned});
+        } else {
+            flowFilesBinned = 0;
+            getLogger().debug("Will not bin any FlowFiles because {} bins 
already exist;"
+                + "will wait until bins have been emptied before any more are 
created", new Object[] {totalBinCount});
+        }
 
         if (!isScheduled()) {
             return;
@@ -194,7 +204,7 @@ public abstract class BinFiles extends 
AbstractSessionFactoryProcessor {
         // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
         // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
         // bins. So we may as well expire it now.
-        if (added == 0 && binManager.getBinCount() >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
+        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
             final Bin bin = binManager.removeOldestBin();
             if (bin != null) {
                 added++;

Reply via email to