This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.9.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b3cc626c92bd3112f86a9ee046380c1744210cb8 Author: Mark Payne <[email protected]> AuthorDate: Fri Feb 22 09:39:56 2019 -0500 NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping indefinitely and instead transfer no more than 10,000 FlowFiles before returning from onTrigger. Updated Local Ports to behavior in the same way. Updated Root Group Ports so that instead of blocking for up to 100 milliseconds for an incoming request, it blocks for up to 1 millisecond and if nothing is available yields for the 'bored yield duration' Signed-off-by: Brandon Devries <[email protected]> This closes #3328. --- .../org/apache/nifi/controller/StandardFunnel.java | 11 ++++++- .../org/apache/nifi/connectable/LocalPort.java | 38 +++++++++++++++------- .../apache/nifi/remote/StandardRootGroupPort.java | 5 +-- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index 96008a3..231ec42 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -370,14 +370,23 @@ public class StandardFunnel implements Funnel { readLock.lock(); try { Set<Relationship> available = context.getAvailableRelationships(); + int iterations = 0; while (!available.isEmpty()) { - final List<FlowFile> flowFiles = session.get(100); + final List<FlowFile> flowFiles = session.get(1000); if (flowFiles.isEmpty()) { break; } session.transfer(flowFiles, Relationship.ANONYMOUS); session.commit(); + + // If there are fewer than 1,000 FlowFiles available to transfer, or if we + // have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from + // holding the Timer-Driven Thread for an excessive amount of time. + if (flowFiles.size() < 1000 || ++iterations >= 10) { + break; + } + available = context.getAvailableRelationships(); } } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index fcf6b2d..f4baa16 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -16,16 +16,6 @@ */ package org.apache.nifi.connectable; -import org.apache.nifi.connectable.ConnectableType; -import org.apache.nifi.connectable.Connection; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; @@ -36,6 +26,14 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * Provides a mechanism by which <code>FlowFile</code>s can be transferred into and out of a <code>ProcessGroup</code> to and/or from another <code>ProcessGroup</code> within the same instance of * NiFi. @@ -73,9 +71,25 @@ public class LocalPort extends AbstractPort { public void onTrigger(final ProcessContext context, final ProcessSession session) { readLock.lock(); try { - final List<FlowFile> flowFiles = session.get(100); - if (!flowFiles.isEmpty()) { + Set<Relationship> available = context.getAvailableRelationships(); + int iterations = 0; + while (!available.isEmpty()) { + final List<FlowFile> flowFiles = session.get(1000); + if (flowFiles.isEmpty()) { + break; + } + session.transfer(flowFiles, Relationship.ANONYMOUS); + session.commit(); + + // If there are fewer than 1,000 FlowFiles available to transfer, or if we + // have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from + // holding the Timer-Driven Thread for an excessive amount of time. + if (flowFiles.size() < 1000 || ++iterations >= 10) { + break; + } + + available = context.getAvailableRelationships(); } } finally { readLock.unlock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 27f9d9c..b418579 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -110,7 +110,7 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort this.identityMappings = IdentityMappingUtil.getIdentityMappings(nifiProperties); this.bulletinRepository = bulletinRepository; this.scheduler = scheduler; - setYieldPeriod("100 millis"); + setYieldPeriod(nifiProperties.getBoredYieldDuration()); eventReporter = new EventReporter() { private static final long serialVersionUID = 1L; @@ -142,12 +142,13 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { final FlowFileRequest flowFileRequest; try { - flowFileRequest = requestQueue.poll(100, TimeUnit.MILLISECONDS); + flowFileRequest = requestQueue.poll(1, TimeUnit.MILLISECONDS); } catch (final InterruptedException ie) { return; } if (flowFileRequest == null) { + context.yield(); return; }
