This is an automated email from the ASF dual-hosted git repository.

devriesb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e26bacf  NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping 
indefinitely and instead transfer no more than 10,000 FlowFiles before 
returning from onTrigger. Updated Local Ports to behavior in the same way. 
Updated Root Group Ports so that instead of blocking for up to 100 milliseconds 
for an incoming request, it blocks for up to 1 millisecond and if nothing is 
available yields for the 'bored yield duration'
e26bacf is described below

commit e26bacf69799fc98ce9b4674551ecb520488a6e6
Author: Mark Payne <[email protected]>
AuthorDate: Fri Feb 22 09:39:56 2019 -0500

    NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping indefinitely 
and instead transfer no more than 10,000 FlowFiles before returning from 
onTrigger. Updated Local Ports to behavior in the same way. Updated Root Group 
Ports so that instead of blocking for up to 100 milliseconds for an incoming 
request, it blocks for up to 1 millisecond and if nothing is available yields 
for the 'bored yield duration'
    
    Signed-off-by: Brandon Devries <[email protected]>
    
    This closes #3328.
---
 .../org/apache/nifi/controller/StandardFunnel.java | 11 ++++++-
 .../org/apache/nifi/connectable/LocalPort.java     | 38 +++++++++++++++-------
 .../apache/nifi/remote/StandardRootGroupPort.java  |  5 +--
 3 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 96008a3..231ec42 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -370,14 +370,23 @@ public class StandardFunnel implements Funnel {
         readLock.lock();
         try {
             Set<Relationship> available = context.getAvailableRelationships();
+            int iterations = 0;
             while (!available.isEmpty()) {
-                final List<FlowFile> flowFiles = session.get(100);
+                final List<FlowFile> flowFiles = session.get(1000);
                 if (flowFiles.isEmpty()) {
                     break;
                 }
 
                 session.transfer(flowFiles, Relationship.ANONYMOUS);
                 session.commit();
+
+                // If there are fewer than 1,000 FlowFiles available to 
transfer, or if we
+                // have hit a cap of 10,000 FlowFiles, we want to stop. This 
prevents us from
+                // holding the Timer-Driven Thread for an excessive amount of 
time.
+                if (flowFiles.size() < 1000 || ++iterations >= 10) {
+                    break;
+                }
+
                 available = context.getAvailableRelationships();
             }
         } finally {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
index fcf6b2d..f4baa16 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -16,16 +16,6 @@
  */
 package org.apache.nifi.connectable;
 
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ProcessScheduler;
@@ -36,6 +26,14 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /**
  * Provides a mechanism by which <code>FlowFile</code>s can be transferred 
into and out of a <code>ProcessGroup</code> to and/or from another 
<code>ProcessGroup</code> within the same instance of
  * NiFi.
@@ -73,9 +71,25 @@ public class LocalPort extends AbstractPort {
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         readLock.lock();
         try {
-            final List<FlowFile> flowFiles = session.get(100);
-            if (!flowFiles.isEmpty()) {
+            Set<Relationship> available = context.getAvailableRelationships();
+            int iterations = 0;
+            while (!available.isEmpty()) {
+                final List<FlowFile> flowFiles = session.get(1000);
+                if (flowFiles.isEmpty()) {
+                    break;
+                }
+
                 session.transfer(flowFiles, Relationship.ANONYMOUS);
+                session.commit();
+
+                // If there are fewer than 1,000 FlowFiles available to 
transfer, or if we
+                // have hit a cap of 10,000 FlowFiles, we want to stop. This 
prevents us from
+                // holding the Timer-Driven Thread for an excessive amount of 
time.
+                if (flowFiles.size() < 1000 || ++iterations >= 10) {
+                    break;
+                }
+
+                available = context.getAvailableRelationships();
             }
         } finally {
             readLock.unlock();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 27f9d9c..b418579 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -110,7 +110,7 @@ public class StandardRootGroupPort extends AbstractPort 
implements RootGroupPort
         this.identityMappings = 
IdentityMappingUtil.getIdentityMappings(nifiProperties);
         this.bulletinRepository = bulletinRepository;
         this.scheduler = scheduler;
-        setYieldPeriod("100 millis");
+        setYieldPeriod(nifiProperties.getBoredYieldDuration());
         eventReporter = new EventReporter() {
             private static final long serialVersionUID = 1L;
 
@@ -142,12 +142,13 @@ public class StandardRootGroupPort extends AbstractPort 
implements RootGroupPort
     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
         final FlowFileRequest flowFileRequest;
         try {
-            flowFileRequest = requestQueue.poll(100, TimeUnit.MILLISECONDS);
+            flowFileRequest = requestQueue.poll(1, TimeUnit.MILLISECONDS);
         } catch (final InterruptedException ie) {
             return;
         }
 
         if (flowFileRequest == null) {
+            context.yield();
             return;
         }
 

Reply via email to