Repository: nifi
Updated Branches:
  refs/heads/master 76b0065a6 -> be0949570


NIFI-5824: Added unit test to FlowController to ensure that the 
ProcessScheduler that it creates is properly initialized. Also updated the 
properties file used by TestFlowController to use a VolatileContentRepository 
instead of FileSystemRepository, and fixed EventDrivenWorkerQueue to return if 
calls to poll() are interrupted (via Thread.interrupt) - making these minor 
fixes resulted in the unit test TestFlowController running in 2 seconds instead 
of 30+ seconds on my machine

This closes #3173.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be094957
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be094957
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be094957

Branch: refs/heads/master
Commit: be0949570a66f672e128ac97c936df546c7d2521
Parents: 76b0065
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Nov 15 14:26:36 2018 -0500
Committer: Bryan Bende <bbe...@apache.org>
Committed: Thu Nov 15 15:01:56 2018 -0500

----------------------------------------------------------------------
 .../nifi/controller/EventDrivenWorkerQueue.java | 12 ++++++-----
 .../scheduling/StandardProcessScheduler.java    |  9 +++++++--
 .../nifi/controller/TestFlowController.java     | 21 ++++++++++++++++++++
 .../flowcontrollertest.nifi.properties          |  3 +++
 4 files changed, 38 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
index f36a459..25e8a86 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java
@@ -16,6 +16,11 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.Connectables;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -25,11 +30,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.nifi.connectable.Connectable;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.util.Connectables;
-
 public class EventDrivenWorkerQueue implements WorkerQueue {
 
     private final Object workMonitor = new Object();
@@ -69,6 +69,8 @@ public class EventDrivenWorkerQueue implements WorkerQueue {
                     try {
                         workMonitor.wait(timeLeft);
                     } catch (final InterruptedException ignored) {
+                        Thread.currentThread().interrupt();
+                        return null;
                     }
                 } else {
                     // Decrement the amount of work there is to do for this 
worker.

http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 2902f6a..2ff3307 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -35,6 +35,7 @@ import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.logging.ComponentLog;
@@ -103,6 +104,10 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
     }
 
+    public ControllerServiceProvider getControllerServiceProvider() {
+        return flowController.getControllerServiceProvider();
+    }
+
     private StateManager getStateManager(final String componentId) {
         return stateManagerProvider.getStateManager(componentId);
     }
@@ -293,7 +298,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     public synchronized CompletableFuture<Void> startProcessor(final 
ProcessorNode procNode, final boolean failIfStopping) {
         final LifecycleState lifecycleState = 
getLifecycleState(requireNonNull(procNode), true);
 
-        final StandardProcessContext processContext = new 
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
+        final StandardProcessContext processContext = new 
StandardProcessContext(procNode, getControllerServiceProvider(),
             this.encryptor, getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated);
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -333,7 +338,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     public synchronized CompletableFuture<Void> stopProcessor(final 
ProcessorNode procNode) {
         final LifecycleState lifecycleState = getLifecycleState(procNode, 
false);
 
-        StandardProcessContext processContext = new 
StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
+        StandardProcessContext processContext = new 
StandardProcessContext(procNode, getControllerServiceProvider(),
             this.encryptor, getStateManager(procNode.getIdentifier()), 
lifecycleState::isTerminated);
 
         LOG.info("Stopping {}", procNode);

http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
index e3c91b2..651ce9c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
@@ -34,8 +34,10 @@ import 
org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.flow.FlowManager;
 import 
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.serialization.FlowSynchronizer;
 import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.mock.DummyProcessor;
 import org.apache.nifi.controller.service.mock.DummyReportingTask;
 import org.apache.nifi.controller.service.mock.ServiceA;
@@ -95,6 +97,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -126,6 +129,7 @@ public class TestFlowController {
         return StringEncryptor.createEncryptor(algorithm, provider, password);
     }
 
+
     @Before
     public void setup() {
 
@@ -337,6 +341,23 @@ public class TestFlowController {
         controller.synchronize(standardFlowSynchronizer, proposedDataFlow);
     }
 
+    /**
+     * StandardProcessScheduler is created by FlowController. The 
StandardProcessScheduler needs access to the Controller Service Provider,
+     * but the Controller Service Provider needs the ProcessScheduler in its 
constructor. So the StandardProcessScheduler obtains the Controller Service
+     * Provider by making a call back to 
FlowController.getControllerServiceProvider. This test exists to ensure that we 
always have access to the
+     * Controller Service Provider in the Process Scheduler, and that we don't 
inadvertently start storing away the result of calling
+     * FlowController.getControllerServiceProvider() before the service 
provider has been fully initialized.
+     */
+    @Test
+    public void testProcessSchedulerHasAccessToControllerServiceProvider() {
+        final StandardProcessScheduler scheduler = 
controller.getProcessScheduler();
+        assertNotNull(scheduler);
+
+        final ControllerServiceProvider serviceProvider = 
scheduler.getControllerServiceProvider();
+        assertNotNull(serviceProvider);
+        assertSame(serviceProvider, controller.getControllerServiceProvider());
+    }
+
     @Test
     public void 
testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
         final FlowSynchronizer standardFlowSynchronizer = new 
StandardFlowSynchronizer(

http://git-wip-us.apache.org/repos/asf/nifi/blob/be094957/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
index d9aa4d2..a4c1a4a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flowcontrollertest.nifi.properties
@@ -48,6 +48,9 @@ nifi.swap.out.period=5 sec
 nifi.swap.out.threads=4
 
 # Content Repository
+nifi.content.repository.implementation=org.apache.nifi.controller.repository.VolatileContentRepository
+nifi.volatile.content.repository.max.size=1 KB
+nifi.volatile.content.repository.block.size=1 KB
 nifi.content.claim.max.appendable.size=10 MB
 nifi.content.claim.max.flow.files=100
 
nifi.content.repository.directory.default=./target/flowcontrollertest/content_repository

Reply via email to