Repository: nifi
Updated Branches:
  refs/heads/master 2800df30d -> 5a7e6c6ac


NIFI-4064 Make sure that Funnels with queued incoming FlowFiles, but no 
outgoing connections yield rather than continually check to run.

Signed-off-by: Pierre Villard <[email protected]>

This closes #1914.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a7e6c6a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a7e6c6a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a7e6c6a

Branch: refs/heads/master
Commit: 5a7e6c6ac121f24228ffaecd29b46a08d2a1a6d3
Parents: 2800df3
Author: Wesley-Lawrence <[email protected]>
Authored: Wed Jun 14 11:57:07 2017 -0400
Committer: Pierre Villard <[email protected]>
Committed: Sat Jun 17 15:50:40 2017 +0200

----------------------------------------------------------------------
 .../tasks/ContinuallyRunConnectableTask.java    |  7 +-
 .../scheduling/DummyScheduleState.java          | 25 +++++++
 .../TestContinuallyRunConnectableTask.java      | 72 ++++++++++++++++++++
 3 files changed, 101 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5a7e6c6a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index b856f11..daf21a5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -67,10 +67,11 @@ public class ContinuallyRunConnectableTask implements 
Callable<Boolean> {
         // 4. There is a connection for each relationship.
         final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
         boolean flowFilesQueued = true;
+        boolean funnelWithoutConnections = false;
         boolean relationshipAvailable = true;
         final boolean shouldRun = (connectable.getYieldExpiration() < 
System.currentTimeMillis())
                 && (triggerWhenEmpty || (flowFilesQueued = 
Connectables.flowFilesQueued(connectable)))
-                && (connectable.getConnectableType() != ConnectableType.FUNNEL 
|| !connectable.getConnections().isEmpty())
+                && (connectable.getConnectableType() != ConnectableType.FUNNEL 
|| !(funnelWithoutConnections = connectable.getConnections().isEmpty()))
             && (connectable.getRelationships().isEmpty() || 
(relationshipAvailable = Connectables.anyRelationshipAvailable(connectable)));
 
         if (shouldRun) {
@@ -100,8 +101,8 @@ public class ContinuallyRunConnectableTask implements 
Callable<Boolean> {
 
                 scheduleState.decrementActiveThreadCount();
             }
-        } else if (!flowFilesQueued || !relationshipAvailable) {
-            // Either there are no FlowFiles queued, or the relationship is 
not available (i.e., backpressure is applied).
+        } else if (!flowFilesQueued || funnelWithoutConnections || 
!relationshipAvailable) {
+            // Either there are no FlowFiles queued, it's a funnel without 
outgoing connections, or the relationship is not available (i.e., backpressure 
is applied).
             // We will yield for just a bit.
             return true;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a7e6c6a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java
new file mode 100644
index 0000000..f70e188
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DummyScheduleState.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.scheduling;
+
+public class DummyScheduleState extends ScheduleState {
+
+    public DummyScheduleState(boolean isScheduled) {
+        setScheduled(isScheduled);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a7e6c6a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java
new file mode 100644
index 0000000..5133981
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunConnectableTask.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.tasks;
+
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.scheduling.DummyScheduleState;
+import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class TestContinuallyRunConnectableTask {
+
+    @Test
+    public void funnelsShouldYieldWhenNoOutboundConnections() {
+
+        // Incoming connection with FlowFile
+        final FlowFileQueue flowFileQueueNonEmpty = 
Mockito.mock(FlowFileQueue.class);
+        
Mockito.when(flowFileQueueNonEmpty.isActiveQueueEmpty()).thenReturn(false);
+
+        final Connection connectionNonEmpty = Mockito.mock(Connection.class);
+        
Mockito.when(connectionNonEmpty.getFlowFileQueue()).thenReturn(flowFileQueueNonEmpty);
+
+        // Create a Funnel with an inbound connection, and no outbound 
connections
+        final Funnel testFunnelNoOutbound = Mockito.mock(Funnel.class);
+        
Mockito.when(testFunnelNoOutbound.getIncomingConnections()).thenReturn(Collections.singletonList(connectionNonEmpty));
+        
Mockito.when(testFunnelNoOutbound.getConnections()).thenReturn(Collections.emptySet());
+
+        // Set the Funnel to be yielding up to 5 seconds ago
+        
Mockito.when(testFunnelNoOutbound.getYieldExpiration()).thenReturn(System.currentTimeMillis()
 - 5000);
+        // Set the Funnel 'isTriggeredWhenEmpty' to false (same as what 
'StandardFunnel' returns)
+        
Mockito.when(testFunnelNoOutbound.isTriggerWhenEmpty()).thenReturn(false);
+        // Set the Funnel connection type
+        
Mockito.when(testFunnelNoOutbound.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
+        // Set the Funnel relationships to Anonymous (same as what 
'StandardFunnel' returns)
+        
Mockito.when(testFunnelNoOutbound.getRelationships()).thenReturn(Collections.singletonList(Relationship.ANONYMOUS));
+
+        // Create Mock 'ProcessContextFactory', and 'ProcessContext'
+        final ProcessContextFactory pcf = 
Mockito.mock(ProcessContextFactory.class);
+        Mockito.when(pcf.newProcessContext(Mockito.any(), 
Mockito.any())).thenReturn(null);
+
+        final ProcessContext pc = Mockito.mock(ProcessContext.class);
+
+        // Create ContinuallyRunConnectableTask
+        ContinuallyRunConnectableTask crct = new 
ContinuallyRunConnectableTask(pcf, testFunnelNoOutbound, new 
DummyScheduleState(true), pc);
+
+        // We should yield since this Funnel has no outbound connections.
+        Assert.assertTrue("Didn't yield when a Funnel has no outbound 
connections.", crct.call());
+    }
+
+}

Reply via email to