Repository: nifi Updated Branches: refs/heads/master 07d4d7005 -> 5a8b2cf7f
NIFI-1606: Run the onComponentRemoved logic of state providers in a background thread Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a8b2cf7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a8b2cf7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a8b2cf7 Branch: refs/heads/master Commit: 5a8b2cf7f1c8df13720a74dd7f755343208c030f Parents: 07d4d70 Author: Mark Payne <[email protected]> Authored: Tue Mar 8 13:21:00 2016 -0500 Committer: joewitt <[email protected]> Committed: Sun Mar 13 14:08:24 2016 -0400 ---------------------------------------------------------------------- .../nifi/groups/StandardProcessGroup.java | 17 +++++++++++----- .../TestStandardControllerServiceProvider.java | 6 +++--- .../providers/AbstractTestStateProvider.java | 21 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5a8b2cf7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 17f9149..4646d55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -38,6 +38,7 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -46,12 +47,12 @@ import org.apache.nifi.connectable.LocalPort; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.label.Label; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; @@ -73,7 +74,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final AtomicReference<Position> position; private final AtomicReference<String> comments; - private final ProcessScheduler scheduler; + private final StandardProcessScheduler scheduler; private final ControllerServiceProvider controllerServiceProvider; private final FlowController flowController; @@ -93,8 +94,8 @@ public final class StandardProcessGroup implements ProcessGroup { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class); - public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor, - final FlowController flowController) { + public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler, + final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController) { this.id = id; this.controllerServiceProvider = serviceProvider; this.parent = new AtomicReference<>(); @@ -729,7 +730,13 @@ public final class StandardProcessGroup implements ProcessGroup { processors.remove(id); LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); - flowController.getStateManagerProvider().onComponentRemoved(processor.getIdentifier()); + final StateManagerProvider stateManagerProvider = flowController.getStateManagerProvider(); + scheduler.submitFrameworkTask(new Runnable() { + @Override + public void run() { + stateManagerProvider.onComponentRemoved(processor.getIdentifier()); + } + }); // must copy to avoid a concurrent modification final Set<Connection> copy = new HashSet<>(processor.getConnections()); http://git-wip-us.apache.org/repos/asf/nifi/blob/5a8b2cf7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 11b73a8..5abefda 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -75,7 +75,7 @@ public class TestStandardControllerServiceProvider { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); } - private ProcessScheduler createScheduler() { + private StandardProcessScheduler createScheduler() { final Heartbeater heartbeater = Mockito.mock(Heartbeater.class); return new StandardProcessScheduler(heartbeater, null, null, stateManagerProvider); } @@ -331,7 +331,7 @@ public class TestStandardControllerServiceProvider { assertTrue(ordered.get(1) == serviceNode3); } - private ProcessorNode createProcessor(final ProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) { + private ProcessorNode createProcessor(final StandardProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) { final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(), new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); @@ -344,7 +344,7 @@ public class TestStandardControllerServiceProvider { @Test public void testEnableReferencingComponents() { - final ProcessScheduler scheduler = createScheduler(); + final StandardProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false); http://git-wip-us.apache.org/repos/asf/nifi/blob/5a8b2cf7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java index 1cd1f37..80037fe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java @@ -188,5 +188,26 @@ public abstract class AbstractTestStateProvider { assertFalse(replaced); } + @Test + public void testOnComponentRemoved() throws IOException, InterruptedException { + final StateProvider provider = getProvider(); + final Map<String, String> newValue = new HashMap<>(); + newValue.put("value", "value"); + + provider.setState(newValue, componentId); + final StateMap stateMap = provider.getState(componentId); + assertEquals(0L, stateMap.getVersion()); + + provider.onComponentRemoved(componentId); + + // wait for the background process to complete + Thread.sleep(1000L); + + final StateMap stateMapAfterRemoval = provider.getState(componentId); + + // version should be -1 because the state has been removed entirely. + assertEquals(-1L, stateMapAfterRemoval.getVersion()); + } + protected abstract StateProvider getProvider(); }
