NIFI-408: Instead of yielding for 100 milliseconds when data is not available, Root Group Port should poll with a max wait time of 100 milliseconds and if no data is available return without yielding.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d6408046 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d6408046 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d6408046 Branch: refs/heads/develop Commit: d6408046bca336e4a11a9c123eef38139f05af78 Parents: 12fa9e7 Author: Mark Payne <[email protected]> Authored: Mon Mar 23 15:11:41 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Mon Mar 23 15:11:41 2015 -0400 ---------------------------------------------------------------------- .../org/apache/nifi/remote/StandardRootGroupPort.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d6408046/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java index 5fbd92f..021531f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -63,7 +64,6 @@ import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.user.NiFiUser; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,9 +126,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { - final FlowFileRequest flowFileRequest = requestQueue.poll(); + final FlowFileRequest flowFileRequest; + try { + flowFileRequest = requestQueue.poll(100, TimeUnit.MILLISECONDS); + } catch (final InterruptedException ie) { + return; + } + if ( flowFileRequest == null ) { - context.yield(); return; }
