Repository: nifi Updated Branches: refs/heads/master 2800df30d -> 5a7e6c6ac
NIFI-4064 Make sure that Funnels with queued incoming FlowFiles, but no outgoing connections yield rather than continually check to run. Signed-off-by: Pierre Villard <[email protected]> This closes #1914. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a7e6c6a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a7e6c6a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a7e6c6a Branch: refs/heads/master Commit: 5a7e6c6ac121f24228ffaecd29b46a08d2a1a6d3 Parents: 2800df3 Author: Wesley-Lawrence <[email protected]> Authored: Wed Jun 14 11:57:07 2017 -0400 Committer: Pierre Villard <[email protected]> Committed: Sat Jun 17 15:50:40 2017 +0200 ---------------------------------------------------------------------- .../tasks/ContinuallyRunConnectableTask.java | 7 +- .../scheduling/DummyScheduleState.java | 25 +++++++ .../TestContinuallyRunConnectableTask.java | 72 ++++++++++++++++++++ 3 files changed, 101 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5a7e6c6a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index b856f11..daf21a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -67,10 +67,11 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { // 4. There is a connection for each relationship. final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty(); boolean flowFilesQueued = true; + boolean funnelWithoutConnections = false; boolean relationshipAvailable = true; final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis()) && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) - && (connectable.getConnectableType() != ConnectableType.FUNNEL || !connectable.getConnections().isEmpty()) + && (connectable.getConnectableType() != ConnectableType.FUNNEL || !(funnelWithoutConnections = connectable.getConnections().isEmpty())) && (connectable.getRelationships().isEmpty() || (relationshipAvailable = Connectables.anyRelationshipAvailable(connectable))); if (shouldRun) { @@ -100,8 +101,8 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> { scheduleState.decrementActiveThreadCount(); } - } else if (!flowFilesQueued || !relationshipAvailable) { - // Either there are no FlowFiles queued, or the relationship is not available (i.e., backpressure is applied). + } else if (!flowFilesQueued || funnelWithoutConnections || !relationshipAvailable) { + // Either there are no FlowFiles queued, it's a funnel without outgoing connections, or the relationship is not available (i.e., backpressure is applied). // We will yield for just a bit. return true; } http://git-wip-us.apache.org/repos/asf/nifi/blob/5a7e6c6a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java new file mode 100644 index 0000000..f70e188 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +public class DummyScheduleState extends ScheduleState { + + public DummyScheduleState(boolean isScheduled) { + setScheduled(isScheduled); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5a7e6c6a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java new file mode 100644 index 0000000..5133981 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.tasks; + +import org.apache.nifi.connectable.ConnectableType; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Funnel; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.scheduling.DummyScheduleState; +import org.apache.nifi.controller.scheduling.ProcessContextFactory; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collections; + +public class TestContinuallyRunConnectableTask { + + @Test + public void funnelsShouldYieldWhenNoOutboundConnections() { + + // Incoming connection with FlowFile + final FlowFileQueue flowFileQueueNonEmpty = Mockito.mock(FlowFileQueue.class); + Mockito.when(flowFileQueueNonEmpty.isActiveQueueEmpty()).thenReturn(false); + + final Connection connectionNonEmpty = Mockito.mock(Connection.class); + Mockito.when(connectionNonEmpty.getFlowFileQueue()).thenReturn(flowFileQueueNonEmpty); + + // Create a Funnel with an inbound connection, and no outbound connections + final Funnel testFunnelNoOutbound = Mockito.mock(Funnel.class); + Mockito.when(testFunnelNoOutbound.getIncomingConnections()).thenReturn(Collections.singletonList(connectionNonEmpty)); + Mockito.when(testFunnelNoOutbound.getConnections()).thenReturn(Collections.emptySet()); + + // Set the Funnel to be yielding up to 5 seconds ago + Mockito.when(testFunnelNoOutbound.getYieldExpiration()).thenReturn(System.currentTimeMillis() - 5000); + // Set the Funnel 'isTriggeredWhenEmpty' to false (same as what 'StandardFunnel' returns) + Mockito.when(testFunnelNoOutbound.isTriggerWhenEmpty()).thenReturn(false); + // Set the Funnel connection type + Mockito.when(testFunnelNoOutbound.getConnectableType()).thenReturn(ConnectableType.FUNNEL); + // Set the Funnel relationships to Anonymous (same as what 'StandardFunnel' returns) + Mockito.when(testFunnelNoOutbound.getRelationships()).thenReturn(Collections.singletonList(Relationship.ANONYMOUS)); + + // Create Mock 'ProcessContextFactory', and 'ProcessContext' + final ProcessContextFactory pcf = Mockito.mock(ProcessContextFactory.class); + Mockito.when(pcf.newProcessContext(Mockito.any(), Mockito.any())).thenReturn(null); + + final ProcessContext pc = Mockito.mock(ProcessContext.class); + + // Create ContinuallyRunConnectableTask + ContinuallyRunConnectableTask crct = new ContinuallyRunConnectableTask(pcf, testFunnelNoOutbound, new DummyScheduleState(true), pc); + + // We should yield since this Funnel has no outbound connections. + Assert.assertTrue("Didn't yield when a Funnel has no outbound connections.", crct.call()); + } + +}
