Repository: nifi Updated Branches: refs/heads/master 423b333b7 -> f120952ab
NIFI-1669: Ensure that we take into account the number of 'ready bins' when determining whether or not we have reached out Max Bin threshold for BinFiles Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f120952a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f120952a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f120952a Branch: refs/heads/master Commit: f120952ab79d844b92c9d7e1014068065250635f Parents: 423b333 Author: Mark Payne <[email protected]> Authored: Tue Mar 22 18:30:37 2016 -0400 Committer: Aldrin Piri <[email protected]> Committed: Tue Mar 22 20:49:39 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processor/util/bin/BinFiles.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f120952a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 7c90342..46891a4 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -169,8 +169,18 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { @Override public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - final int flowFilesBinned = binFlowFiles(context, sessionFactory); - getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned}); + final int totalBinCount = binManager.getBinCount() + readyBins.size(); + final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger(); + final int flowFilesBinned; + + if (totalBinCount < maxBinCount) { + flowFilesBinned = binFlowFiles(context, sessionFactory); + getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned}); + } else { + flowFilesBinned = 0; + getLogger().debug("Will not bin any FlowFiles because {} bins already exist;" + + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount}); + } if (!isScheduled()) { return; @@ -194,7 +204,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the // bins. So we may as well expire it now. - if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) { + if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) { final Bin bin = binManager.removeOldestBin(); if (bin != null) { added++;
