This is an automated email from the ASF dual-hosted git repository.
devriesb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e26bacf NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping
indefinitely and instead transfer no more than 10,000 FlowFiles before
returning from onTrigger. Updated Local Ports to behavior in the same way.
Updated Root Group Ports so that instead of blocking for up to 100 milliseconds
for an incoming request, it blocks for up to 1 millisecond and if nothing is
available yields for the 'bored yield duration'
e26bacf is described below
commit e26bacf69799fc98ce9b4674551ecb520488a6e6
Author: Mark Payne <[email protected]>
AuthorDate: Fri Feb 22 09:39:56 2019 -0500
NIFI-6068, NIFI-6065: Updated StandardFunnel to avoid looping indefinitely
and instead transfer no more than 10,000 FlowFiles before returning from
onTrigger. Updated Local Ports to behavior in the same way. Updated Root Group
Ports so that instead of blocking for up to 100 milliseconds for an incoming
request, it blocks for up to 1 millisecond and if nothing is available yields
for the 'bored yield duration'
Signed-off-by: Brandon Devries <[email protected]>
This closes #3328.
---
.../org/apache/nifi/controller/StandardFunnel.java | 11 ++++++-
.../org/apache/nifi/connectable/LocalPort.java | 38 +++++++++++++++-------
.../apache/nifi/remote/StandardRootGroupPort.java | 5 +--
3 files changed, 39 insertions(+), 15 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 96008a3..231ec42 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -370,14 +370,23 @@ public class StandardFunnel implements Funnel {
readLock.lock();
try {
Set<Relationship> available = context.getAvailableRelationships();
+ int iterations = 0;
while (!available.isEmpty()) {
- final List<FlowFile> flowFiles = session.get(100);
+ final List<FlowFile> flowFiles = session.get(1000);
if (flowFiles.isEmpty()) {
break;
}
session.transfer(flowFiles, Relationship.ANONYMOUS);
session.commit();
+
+ // If there are fewer than 1,000 FlowFiles available to
transfer, or if we
+ // have hit a cap of 10,000 FlowFiles, we want to stop. This
prevents us from
+ // holding the Timer-Driven Thread for an excessive amount of
time.
+ if (flowFiles.size() < 1000 || ++iterations >= 10) {
+ break;
+ }
+
available = context.getAvailableRelationships();
}
} finally {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
index fcf6b2d..f4baa16 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -16,16 +16,6 @@
*/
package org.apache.nifi.connectable;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
@@ -36,6 +26,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/**
* Provides a mechanism by which <code>FlowFile</code>s can be transferred
into and out of a <code>ProcessGroup</code> to and/or from another
<code>ProcessGroup</code> within the same instance of
* NiFi.
@@ -73,9 +71,25 @@ public class LocalPort extends AbstractPort {
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
readLock.lock();
try {
- final List<FlowFile> flowFiles = session.get(100);
- if (!flowFiles.isEmpty()) {
+ Set<Relationship> available = context.getAvailableRelationships();
+ int iterations = 0;
+ while (!available.isEmpty()) {
+ final List<FlowFile> flowFiles = session.get(1000);
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+
session.transfer(flowFiles, Relationship.ANONYMOUS);
+ session.commit();
+
+ // If there are fewer than 1,000 FlowFiles available to
transfer, or if we
+ // have hit a cap of 10,000 FlowFiles, we want to stop. This
prevents us from
+ // holding the Timer-Driven Thread for an excessive amount of
time.
+ if (flowFiles.size() < 1000 || ++iterations >= 10) {
+ break;
+ }
+
+ available = context.getAvailableRelationships();
}
} finally {
readLock.unlock();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 27f9d9c..b418579 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -110,7 +110,7 @@ public class StandardRootGroupPort extends AbstractPort
implements RootGroupPort
this.identityMappings =
IdentityMappingUtil.getIdentityMappings(nifiProperties);
this.bulletinRepository = bulletinRepository;
this.scheduler = scheduler;
- setYieldPeriod("100 millis");
+ setYieldPeriod(nifiProperties.getBoredYieldDuration());
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@@ -142,12 +142,13 @@ public class StandardRootGroupPort extends AbstractPort
implements RootGroupPort
public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) {
final FlowFileRequest flowFileRequest;
try {
- flowFileRequest = requestQueue.poll(100, TimeUnit.MILLISECONDS);
+ flowFileRequest = requestQueue.poll(1, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ie) {
return;
}
if (flowFileRequest == null) {
+ context.yield();
return;
}