Repository: nifi
Updated Branches:
  refs/heads/master 07d4d7005 -> 5a8b2cf7f


NIFI-1606: Run the onComponentRemoved logic of state providers in a background 
thread

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a8b2cf7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a8b2cf7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a8b2cf7

Branch: refs/heads/master
Commit: 5a8b2cf7f1c8df13720a74dd7f755343208c030f
Parents: 07d4d70
Author: Mark Payne <[email protected]>
Authored: Tue Mar 8 13:21:00 2016 -0500
Committer: joewitt <[email protected]>
Committed: Sun Mar 13 14:08:24 2016 -0400

----------------------------------------------------------------------
 .../nifi/groups/StandardProcessGroup.java       | 17 +++++++++++-----
 .../TestStandardControllerServiceProvider.java  |  6 +++---
 .../providers/AbstractTestStateProvider.java    | 21 ++++++++++++++++++++
 3 files changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5a8b2cf7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 17f9149..4646d55 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -38,6 +38,7 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -46,12 +47,12 @@ import org.apache.nifi.connectable.LocalPort;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
@@ -73,7 +74,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
     private final AtomicReference<Position> position;
     private final AtomicReference<String> comments;
 
-    private final ProcessScheduler scheduler;
+    private final StandardProcessScheduler scheduler;
     private final ControllerServiceProvider controllerServiceProvider;
     private final FlowController flowController;
 
@@ -93,8 +94,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessGroup.class);
 
-    public StandardProcessGroup(final String id, final 
ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, 
final NiFiProperties nifiProps, final StringEncryptor encryptor,
-        final FlowController flowController) {
+    public StandardProcessGroup(final String id, final 
ControllerServiceProvider serviceProvider, final StandardProcessScheduler 
scheduler,
+        final NiFiProperties nifiProps, final StringEncryptor encryptor, final 
FlowController flowController) {
         this.id = id;
         this.controllerServiceProvider = serviceProvider;
         this.parent = new AtomicReference<>();
@@ -729,7 +730,13 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             processors.remove(id);
             
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
 
-            
flowController.getStateManagerProvider().onComponentRemoved(processor.getIdentifier());
+            final StateManagerProvider stateManagerProvider = 
flowController.getStateManagerProvider();
+            scheduler.submitFrameworkTask(new Runnable() {
+                @Override
+                public void run() {
+                    
stateManagerProvider.onComponentRemoved(processor.getIdentifier());
+                }
+            });
 
             // must copy to avoid a concurrent modification
             final Set<Connection> copy = new 
HashSet<>(processor.getConnections());

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a8b2cf7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 11b73a8..5abefda 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -75,7 +75,7 @@ public class TestStandardControllerServiceProvider {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
     }
 
-    private ProcessScheduler createScheduler() {
+    private StandardProcessScheduler createScheduler() {
         final Heartbeater heartbeater = Mockito.mock(Heartbeater.class);
         return new StandardProcessScheduler(heartbeater, null, null, 
stateManagerProvider);
     }
@@ -331,7 +331,7 @@ public class TestStandardControllerServiceProvider {
         assertTrue(ordered.get(1) == serviceNode3);
     }
 
-    private ProcessorNode createProcessor(final ProcessScheduler scheduler, 
final ControllerServiceProvider serviceProvider) {
+    private ProcessorNode createProcessor(final StandardProcessScheduler 
scheduler, final ControllerServiceProvider serviceProvider) {
         final ProcessorNode procNode = new StandardProcessorNode(new 
DummyProcessor(), UUID.randomUUID().toString(),
                 new StandardValidationContextFactory(serviceProvider), 
scheduler, serviceProvider);
 
@@ -344,7 +344,7 @@ public class TestStandardControllerServiceProvider {
 
     @Test
     public void testEnableReferencingComponents() {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null, stateManagerProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(ServiceA.class.getName(), "1", false);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a8b2cf7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
index 1cd1f37..80037fe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
@@ -188,5 +188,26 @@ public abstract class AbstractTestStateProvider {
         assertFalse(replaced);
     }
 
+    @Test
+    public void testOnComponentRemoved() throws IOException, 
InterruptedException {
+        final StateProvider provider = getProvider();
+        final Map<String, String> newValue = new HashMap<>();
+        newValue.put("value", "value");
+
+        provider.setState(newValue, componentId);
+        final StateMap stateMap = provider.getState(componentId);
+        assertEquals(0L, stateMap.getVersion());
+
+        provider.onComponentRemoved(componentId);
+
+        // wait for the background process to complete
+        Thread.sleep(1000L);
+
+        final StateMap stateMapAfterRemoval = provider.getState(componentId);
+
+        // version should be -1 because the state has been removed entirely.
+        assertEquals(-1L, stateMapAfterRemoval.getVersion());
+    }
+
     protected abstract StateProvider getProvider();
 }

Reply via email to