http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 8c903ee..4985fe6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -24,6 +24,7 @@ import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.service.ControllerServiceNode; @@ -35,11 +36,13 @@ public class StandardSchedulingContext implements SchedulingContext { private final ProcessContext processContext; private final ControllerServiceProvider serviceProvider; private final ProcessorNode processorNode; + private final StateManager stateManager; - public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) { + public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode, final StateManager stateManager) { this.processContext = processContext; this.serviceProvider = serviceProvider; this.processorNode = processorNode; + this.stateManager = stateManager; } @Override @@ -139,4 +142,9 @@ public class StandardSchedulingContext implements SchedulingContext { final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue()); return (elRanges != null && !elRanges.isEmpty()); } + + @Override + public StateManager getStateManager() { + return stateManager; + } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider new file mode 100644 index 0000000..6786be9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider +org.apache.nifi.components.state.providers.zookeeper.ZooKeeperStateProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 770bb47..543bfd0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -38,6 +38,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; @@ -72,12 +73,13 @@ public class TestStandardProcessScheduler { private StandardProcessScheduler scheduler = null; private ReportingTaskNode taskNode = null; private TestReportingTask reportingTask = null; + private StateManagerProvider stateMgrProvider = Mockito.mock(StateManagerProvider.class); @Before public void setup() throws InitializationException { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); this.refreshNiFiProperties(); - scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null); + scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); reportingTask = new TestReportingTask(); @@ -117,12 +119,12 @@ public class TestStandardProcessScheduler { public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException { final Processor proc = new ServiceReferencingProcessor(); - final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null); + final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(StateManagerProvider.class)); final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(), new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); - procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); + procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier(), true); scheduler.enableControllerService(service); scheduler.startProcessor(procNode); @@ -198,7 +200,7 @@ public class TestStandardProcessScheduler { @Test public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", false); assertFalse(serviceNode.isActive()); @@ -237,7 +239,7 @@ public class TestStandardProcessScheduler { @Test public void validateDisabledServiceCantBeDisabled() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", false); SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); @@ -275,7 +277,7 @@ public class TestStandardProcessScheduler { @Test public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), "1", false); SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); @@ -309,7 +311,7 @@ public class TestStandardProcessScheduler { @Test public void validateDisablingOfTheFailedService() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), "1", false); scheduler.enableControllerService(serviceNode); @@ -340,7 +342,7 @@ public class TestStandardProcessScheduler { @Test public void validateEnabledDisableMultiThread() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { final ControllerServiceNode serviceNode = provider @@ -383,7 +385,7 @@ public class TestStandardProcessScheduler { @Test public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", false); LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); @@ -408,7 +410,7 @@ public class TestStandardProcessScheduler { @Test public void validateLongEnablingServiceCanStillBeDisabled() throws Exception { final ProcessScheduler scheduler = createScheduler(); - StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateMgrProvider); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), "1", false); LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); @@ -505,6 +507,6 @@ public class TestStandardProcessScheduler { } private ProcessScheduler createScheduler() { - return new StandardProcessScheduler(mock(Heartbeater.class), null, null); + return new StandardProcessScheduler(mock(Heartbeater.class), null, null, stateMgrProvider); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java index ed9af31..b9c0f7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.StandardFlowServiceTest; import org.apache.nifi.nar.ExtensionManager; @@ -25,6 +27,7 @@ import org.apache.nifi.util.NiFiProperties; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; public class StandardControllerServiceProviderTest { @@ -43,7 +46,28 @@ public class StandardControllerServiceProviderTest { public void setup() throws Exception { String id = "id"; String clazz = "org.apache.nifi.controller.service.util.TestControllerService"; - ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); + ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, new StateManagerProvider() { + @Override + public StateManager getStateManager(final String componentId) { + return Mockito.mock(StateManager.class); + } + + @Override + public void shutdown() { + } + + @Override + public void enableClusterProvider() { + } + + @Override + public void disableClusterProvider() { + } + + @Override + public void onComponentRemoved(String componentId) { + } + }); ControllerServiceNode node = provider.createControllerService(clazz, id, true); proxied = node.getProxiedControllerService(); implementation = node.getControllerServiceImplementation(); @@ -68,4 +92,4 @@ public class StandardControllerServiceProviderTest { public void testCallImplementationInitialized() throws InitializationException { implementation.initialize(null); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index eb0fa13..ca63ba4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; @@ -49,6 +51,28 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; public class TestStandardControllerServiceProvider { + private static StateManagerProvider stateManagerProvider = new StateManagerProvider() { + @Override + public StateManager getStateManager(String componentId) { + return Mockito.mock(StateManager.class); + } + + @Override + public void shutdown() { + } + + @Override + public void enableClusterProvider() { + } + + @Override + public void disableClusterProvider() { + } + + @Override + public void onComponentRemoved(String componentId) { + } + }; @BeforeClass public static void setNiFiProps() { @@ -57,13 +81,13 @@ public class TestStandardControllerServiceProvider { private ProcessScheduler createScheduler() { final Heartbeater heartbeater = Mockito.mock(Heartbeater.class); - return new StandardProcessScheduler(heartbeater, null, null); + return new StandardProcessScheduler(heartbeater, null, null, stateManagerProvider); } @Test public void testDisableControllerService() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false); provider.enableControllerService(serviceNode); @@ -73,12 +97,12 @@ public class TestStandardControllerServiceProvider { @Test(timeout=10000) public void testEnableDisableWithReference() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false); - serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B"); + serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B", true); try { provider.enableControllerService(serviceNodeA); @@ -126,7 +150,7 @@ public class TestStandardControllerServiceProvider { } public void testEnableReferencingServicesGraph(ProcessScheduler scheduler) { - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); // build a graph of controller services with dependencies as such: // @@ -145,10 +169,10 @@ public class TestStandardControllerServiceProvider { final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4", true); provider.enableControllerService(serviceNode4); provider.enableReferencingServices(serviceNode4); @@ -169,7 +193,7 @@ public class TestStandardControllerServiceProvider { @Test(timeout=10000) public void testStartStopReferencingComponents() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, stateManagerProvider); // build a graph of reporting tasks and controller services with dependencies as such: // @@ -213,20 +237,20 @@ public class TestStandardControllerServiceProvider { final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1, new StandardValidationContextFactory(provider), scheduler, provider); procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider)); - procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1"); + procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1", true); procNodeA.setProcessGroup(mockProcessGroup); final String id2 = UUID.randomUUID().toString(); final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2, new StandardValidationContextFactory(provider), scheduler, provider); procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider)); - procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3"); + procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3", true); procNodeB.setProcessGroup(mockProcessGroup); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4", true); provider.enableControllerService(serviceNode4); provider.enableReferencingServices(serviceNode4); @@ -280,11 +304,11 @@ public class TestStandardControllerServiceProvider { @Test public void testOrderingOfServices() { - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>(); nodeMap.put("1", serviceNode1); @@ -314,7 +338,7 @@ public class TestStandardControllerServiceProvider { // add circular dependency on self. nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1", true); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); @@ -341,8 +365,8 @@ public class TestStandardControllerServiceProvider { // like that. nodeMap.clear(); final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3", true); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1", true); nodeMap.put("1", serviceNode1); nodeMap.put("3", serviceNode3); branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); @@ -364,10 +388,10 @@ public class TestStandardControllerServiceProvider { // Add multiple completely disparate branches. nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); nodeMap.put("3", serviceNode3); @@ -398,8 +422,8 @@ public class TestStandardControllerServiceProvider { // create 2 branches both dependent on the same service nodeMap.clear(); - serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); - serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); nodeMap.put("3", serviceNode3); @@ -426,7 +450,7 @@ public class TestStandardControllerServiceProvider { final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(), new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); - final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null); + final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null, null); group.addProcessor(procNode); procNode.setProcessGroup(group); @@ -436,7 +460,7 @@ public class TestStandardControllerServiceProvider { @Test public void testEnableReferencingComponents() { final ProcessScheduler scheduler = createScheduler(); - final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null, stateManagerProvider); final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false); final ProcessorNode procNode = createProcessor(scheduler, provider); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java new file mode 100644 index 0000000..d724be0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.providers; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProvider; +import org.junit.Test; + + +/** + * <p> + * Abstract class that provides a suite of test for State Providers. Each State Provider implementation can simply extend this class, + * implement the getProvider() method, and have the entire suite of tests run against the provider. + * </p> + * + * <p> + * It is recommended that implementations create a new provider in a method annotated with @Before and cleanup in a method annotated with @After. + * </p> + */ +public abstract class AbstractTestStateProvider { + protected final String componentId = "111111111-1111-1111-1111-111111111111"; + + + @Test + public void testSetAndGet() throws IOException { + getProvider().setState(Collections.singletonMap("testSetAndGet", "value"), componentId); + assertEquals("value", getProvider().getState(componentId).get("testSetAndGet")); + } + + @Test + public void testReplaceSuccessful() throws IOException { + final String key = "testReplaceSuccessful"; + final StateProvider provider = getProvider(); + + StateMap map = provider.getState(componentId); + assertNotNull(map); + assertEquals(-1, map.getVersion()); + + assertNotNull(map.toMap()); + assertTrue(map.toMap().isEmpty()); + provider.setState(Collections.singletonMap(key, "value1"), componentId); + + map = provider.getState(componentId); + assertNotNull(map); + assertEquals(0, map.getVersion()); + assertEquals("value1", map.get(key)); + assertEquals("value1", map.toMap().get(key)); + + final Map<String, String> newMap = new HashMap<>(map.toMap()); + newMap.put(key, "value2"); + assertTrue(provider.replace(map, newMap, componentId)); + + map = provider.getState(componentId); + assertEquals("value2", map.get(key)); + assertEquals(1L, map.getVersion()); + } + + @Test + public void testReplaceWithWrongVersion() throws IOException { + final String key = "testReplaceWithWrongVersion"; + final StateProvider provider = getProvider(); + provider.setState(Collections.singletonMap(key, "value1"), componentId); + + StateMap stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals("value1", stateMap.get(key)); + assertEquals(0, stateMap.getVersion()); + + provider.setState(Collections.singletonMap(key, "intermediate value"), componentId); + + assertFalse(provider.replace(stateMap, Collections.singletonMap(key, "value2"), componentId)); + stateMap = provider.getState(componentId); + assertEquals(key, stateMap.toMap().keySet().iterator().next()); + assertEquals(1, stateMap.toMap().size()); + assertEquals("intermediate value", stateMap.get(key)); + assertEquals(1, stateMap.getVersion()); + } + + + @Test + public void testToMap() throws IOException { + final String key = "testKeySet"; + final StateProvider provider = getProvider(); + Map<String, String> map = provider.getState(componentId).toMap(); + assertNotNull(map); + assertTrue(map.isEmpty()); + + provider.setState(Collections.singletonMap(key, "value"), componentId); + map = provider.getState(componentId).toMap(); + assertNotNull(map); + assertEquals(1, map.size()); + assertEquals("value", map.get(key)); + + provider.setState(Collections.<String, String> emptyMap(), componentId); + + final StateMap stateMap = provider.getState(componentId); + map = stateMap.toMap(); + assertNotNull(map); + assertTrue(map.isEmpty()); + assertEquals(1, stateMap.getVersion()); + } + + @Test + public void testClear() throws IOException { + final StateProvider provider = getProvider(); + StateMap stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals(-1L, stateMap.getVersion()); + assertTrue(stateMap.toMap().isEmpty()); + + provider.setState(Collections.singletonMap("testClear", "value"), componentId); + + stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals(0, stateMap.getVersion()); + assertEquals("value", stateMap.get("testClear")); + + provider.clear(componentId); + + stateMap = provider.getState(componentId); + assertNotNull(stateMap); + assertEquals(1L, stateMap.getVersion()); + assertTrue(stateMap.toMap().isEmpty()); + } + + + protected abstract StateProvider getProvider(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java new file mode 100644 index 0000000..9f7c4c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.providers.local; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.controller.state.StateMapUpdate; +import org.apache.nifi.controller.state.providers.AbstractTestStateProvider; +import org.junit.After; +import org.junit.Before; +import org.wali.WriteAheadRepository; + +public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider { + private StateProvider provider; + private WriteAheadRepository<StateMapUpdate> wal; + + @Before + public void setup() throws IOException { + provider = new WriteAheadLocalStateProvider(); + + final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>(); + properties.put(WriteAheadLocalStateProvider.PATH, new StandardPropertyValue("target/local-state-provider/" + UUID.randomUUID().toString(), null)); + + provider.initialize(new StateProviderInitializationContext() { + @Override + public String getIdentifier() { + return "Unit Test Provider Initialization Context"; + } + + @Override + public Map<PropertyDescriptor, PropertyValue> getProperties() { + return Collections.unmodifiableMap(properties); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + final PropertyValue prop = properties.get(property); + if (prop == null) { + return new StandardPropertyValue(null, null); + } + return prop; + } + + @Override + public SSLContext getSSLContext() { + return null; + } + }); + } + + @After + public void cleanup() throws IOException { + provider.onComponentRemoved(componentId); + + if (wal != null) { + wal.shutdown(); + } + } + + @Override + protected StateProvider getProvider() { + return provider; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java new file mode 100644 index 0000000..b841285 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.providers.zookeeper; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ssl.SSLContext; + +import org.apache.curator.test.TestingServer; +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.controller.state.providers.AbstractTestStateProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +public class TestZooKeeperStateProvider extends AbstractTestStateProvider { + + private StateProvider provider; + private static TestingServer zkServer; + + @BeforeClass + public static void createZooKeeper() throws Exception { + zkServer = new TestingServer(2181, true); + zkServer.start(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (zkServer != null) { + zkServer.stop(); + zkServer.close(); + } + } + + @Before + public void setup() throws Exception { + final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>(); + properties.put(ZooKeeperStateProvider.CONNECTION_STRING, new StandardPropertyValue(zkServer.getConnectString(), null)); + properties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, new StandardPropertyValue("3 secs", null)); + properties.put(ZooKeeperStateProvider.ROOT_NODE, new StandardPropertyValue("/nifi/team1/testing", null)); + + provider = new ZooKeeperStateProvider(); + provider.initialize(new StateProviderInitializationContext() { + @Override + public String getIdentifier() { + return "Unit Test Provider Initialization Context"; + } + + @Override + public Map<PropertyDescriptor, PropertyValue> getProperties() { + return Collections.unmodifiableMap(properties); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + final PropertyValue prop = properties.get(property); + if (prop == null) { + return new StandardPropertyValue(null, null); + } + return prop; + } + + @Override + public SSLContext getSSLContext() { + return null; + } + }); + + provider.enable(); + } + + @After + public void clear() throws IOException { + getProvider().onComponentRemoved(componentId); + getProvider().disable(); + getProvider().shutdown(); + } + + @Override + protected StateProvider getProvider() { + return provider; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 00e1274..4a665d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -33,6 +33,20 @@ nifi.nar.library.directory=${nifi.nar.library.directory} nifi.nar.working.directory=${nifi.nar.working.directory} nifi.documentation.working.directory=${nifi.documentation.working.directory} +#################### +# State Management # +#################### +nifi.state.management.configuration.file=${nifi.state.management.configuration.file} +# The ID of the local state provider +nifi.state.management.provider.local=${nifi.state.management.provider.local} +# The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster. +nifi.state.management.provider.cluster=${nifi.state.management.provider.cluster} +# Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server +nifi.state.management.embedded.zookeeper.start=${nifi.state.management.embedded.zookeeper.start} +# Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true +nifi.state.management.embedded.zookeeper.properties=${nifi.state.management.embedded.zookeeper.properties} + + # H2 Settings nifi.database.directory=${nifi.database.directory} nifi.h2.url.append=${nifi.h2.url.append} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml new file mode 100644 index 0000000..ba5259c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- + This file provides a mechanism for defining and configuring the State Providers + that should be used for storing state locally and across a NiFi cluster. In order + to use a specific provider, it must be configured here and its identifier + must be specified in the nifi.properties file. +--> +<stateManagement> + <!-- + State Provider that stores state locally in a configurable directory. This Provider requires the following properties: + + Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it + is important that the directory be copied over to the new version when upgrading NiFi. + --> + <local-provider> + <id>local-provider</id> + <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class> + <property name="Directory">./state</property> + </local-provider> + + <!-- + State Provider that is used to store state in ZooKeeper. This Provider requires the following properties: + + Root Node - the root node in ZooKeeper where state should be stored. The default is '/nifi', but it is advisable to change this to a different value if not using + the embedded ZooKeeper server and if multiple NiFi instances may all be using the same ZooKeeper Server. + + Connect String - A comma-separated list of host:port pairs to connect to ZooKeeper. For example, localhost:2181,localhost:5555,127.0.0.1:6666 + + Session Timeout - Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session. Default value is "3 seconds" + --> + <cluster-provider> + <id>zk-provider</id> + <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class> + <property name="Connect String"></property> + <property name="Root Node">/nifi</property> + <property name="Session Timeout">30 seconds</property> + </cluster-provider> +</stateManagement> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties new file mode 100644 index 0000000..55afc94 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties @@ -0,0 +1,30 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# +# + +clientPort=2181 +initLimit=10 +autopurge.purgeInterval=24 +syncLimit=5 +tickTime=2000 +dataDir=./state/zookeeper +autopurge.snapRetainCount=30 +server.1= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java index 7d0ffab..ef7a61c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java @@ -133,7 +133,7 @@ public class ClusterManagerServerProtocol implements ServerProtocol { continue; } - dos.writeUTF(nodeInfo.getHostname()); + dos.writeUTF(nodeInfo.getSiteToSiteHostname()); dos.writeInt(nodeInfo.getSiteToSitePort()); dos.writeBoolean(nodeInfo.isSiteToSiteSecure()); dos.writeInt(nodeInfo.getTotalFlowFiles()); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java index 486e6d7..0676668 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java @@ -241,9 +241,9 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro final String propName = entry.getKey(); final String propVal = entry.getValue(); if (isNotNull(propName) && propVal == null) { - controllerService.removeProperty(propName); + controllerService.removeProperty(propName, true); } else if (isNotNull(propName)) { - controllerService.setProperty(propName, propVal); + controllerService.setProperty(propName, propVal, true); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java index e52a476..81d882c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java @@ -171,9 +171,9 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO { final String propName = entry.getKey(); final String propVal = entry.getValue(); if (isNotNull(propName) && propVal == null) { - processor.removeProperty(propName); + processor.removeProperty(propName, true); } else if (isNotNull(propName)) { - processor.setProperty(propName, propVal); + processor.setProperty(propName, propVal, true); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java index ffe606a..38d793d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java @@ -285,9 +285,9 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT final String propName = entry.getKey(); final String propVal = entry.getValue(); if (isNotNull(propName) && propVal == null) { - reportingTask.removeProperty(propName); + reportingTask.removeProperty(propName, true); } else if (isNotNull(propName)) { - reportingTask.setProperty(propName, propVal); + reportingTask.setProperty(propName, propVal, true); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties index 10db651..8c54f24 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties @@ -29,6 +29,13 @@ nifi.ui.autorefresh.interval=30 sec nifi.nar.library.directory=target/test-classes/access-control/lib nifi.nar.working.directory=target/test-classes/access-control/nar +nifi.state.management.configuration.file=target/test-classes/access-control/state-management.xml +nifi.state.management.embedded.zookeeper.start=false +nifi.state.management.embedded.zookeeper.properties= +nifi.state.management.embedded.zookeeper.max.instances=3 +nifi.state.management.provider.local=local-provider +nifi.state.management.provider.cluster= + # H2 Settings nifi.database.directory=target/test-classes/database_repository nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml new file mode 100644 index 0000000..1714c7d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- + This file lists the authority providers to use when running securely. In order + to use a specific provider it must be configured here and it's identifier + must be specified in the nifi.properties file. +--> +<stateManagement> + <!-- + This file provides a mechanism for defining and configuring the State Providers + that should be used for storing state locally and across a NiFi cluster. + --> + + <!-- + State Provider that stores state locally in a configurable directory. This Provider requires the following properties: + + Directory - the directory to store components' state in. If the directory being used is a sub-directory of the NiFi installation, it + is important that the directory be copied over to the new version when upgrading NiFi. + --> + <local-provider> + <id>local-provider</id> + <class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class> + <property name="Directory">target/test-classes/access-control/state-management</property> + </local-provider> +</stateManagement> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java index 3a6ac79..e49a8be 100644 --- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java +++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java @@ -55,6 +55,7 @@ import ca.uhn.hl7v2.model.Structure; import ca.uhn.hl7v2.model.Type; import ca.uhn.hl7v2.model.Varies; import ca.uhn.hl7v2.parser.PipeParser; +import ca.uhn.hl7v2.validation.ValidationContext; import ca.uhn.hl7v2.validation.impl.ValidationContextFactory; @SideEffectFree @@ -120,7 +121,7 @@ public class ExtractHL7Attributes extends AbstractProcessor { @SuppressWarnings("resource") final HapiContext hapiContext = new DefaultHapiContext(); - hapiContext.setValidationContext(ValidationContextFactory.noValidation()); + hapiContext.setValidationContext((ValidationContext) ValidationContextFactory.noValidation()); final PipeParser parser = hapiContext.getPipeParser(); final String hl7Text = new String(buffer, charset); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java index 26e8bb6..ba90370 100644 --- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java +++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java @@ -161,7 +161,7 @@ public class RouteHL7 extends AbstractProcessor { @SuppressWarnings("resource") final HapiContext hapiContext = new DefaultHapiContext(); - hapiContext.setValidationContext(ValidationContextFactory.noValidation()); + hapiContext.setValidationContext((ca.uhn.hl7v2.validation.ValidationContext) ValidationContextFactory.noValidation()); final PipeParser parser = hapiContext.getPipeParser(); final String hl7Text = new String(buffer, charset); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java index 08f5aa7..802f889 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java @@ -166,8 +166,7 @@ public class EmbeddedKafka { ServerConfig configuration = new ServerConfig(); configuration.readFrom(quorumConfiguration); - FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), - new File(configuration.getDataDir())); + FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir())); zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(configuration.getTickTime()); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index e592483..20969c9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -19,14 +19,12 @@ package org.apache.nifi.processors.standard; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; +import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -34,9 +32,13 @@ import java.util.Properties; import java.util.Set; import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; @@ -50,7 +52,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.standard.util.EntityListing; import org.apache.nifi.processors.standard.util.ListableEntity; -import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; @@ -157,9 +158,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab .build(); - private volatile Long lastListingTime = null; private volatile Set<String> latestIdentifiersListed = new HashSet<>(); + private volatile Long lastListingTime = null; private volatile boolean electedPrimaryNode = false; + private volatile boolean resetListing = false; + + static final String TIMESTAMP = "timestamp"; + static final String IDENTIFIER_PREFIX = "id"; protected File getPersistenceFile() { return new File("conf/state/" + getIdentifier()); @@ -177,6 +182,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab if (isListingResetNecessary(descriptor)) { lastListingTime = null; // clear lastListingTime so that we have to fetch new time latestIdentifiersListed = new HashSet<>(); + resetListing = true; } } @@ -187,10 +193,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab return relationships; } - protected String getKey(final String directory) { - return getIdentifier() + ".lastListingTime." + directory; - } - @OnPrimaryNodeStateChange public void onPrimaryNodeChange(final PrimaryNodeState newState) { if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) { @@ -198,159 +200,146 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } } - private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { - final ObjectMapper mapper = new ObjectMapper(); - final JsonNode jsonNode = mapper.readTree(serializedState); - return mapper.readValue(jsonNode, EntityListing.class); - } - - - private Long getMinTimestamp(final String directory, final DistributedMapCacheClient client) throws IOException { - // Determine the timestamp for the last file that we've listed. - Long minTimestamp = lastListingTime; - if (minTimestamp == null || electedPrimaryNode) { - // We haven't yet restored any state from local or distributed state - or it's been at least a minute since - // we have performed a listing. In this case, - // First, attempt to get timestamp from distributed cache service. - if (client != null) { - try { - final StringSerDe serde = new StringSerDe(); - final String serializedState = client.get(getKey(directory), serde, serde); - if (serializedState == null || serializedState.isEmpty()) { - minTimestamp = null; - this.latestIdentifiersListed = Collections.emptySet(); - } else { - final EntityListing listing = deserialize(serializedState); - this.lastListingTime = listing.getLatestTimestamp().getTime(); - minTimestamp = listing.getLatestTimestamp().getTime(); - this.latestIdentifiersListed = new HashSet<>(listing.getMatchingIdentifiers()); - } - - this.lastListingTime = minTimestamp; - electedPrimaryNode = false; // no requirement to pull an update from the distributed cache anymore. - } catch (final IOException ioe) { - throw ioe; - } - } + @OnScheduled + public final void updateState(final ProcessContext context) throws IOException { + final String path = getPath(context); + final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - // Check the persistence file. We want to use the latest timestamp that we have so that - // we don't duplicate data. + // Check if state already exists for this path. If so, we have already migrated the state. + final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + if (stateMap.getVersion() == -1L) { try { - final File persistenceFile = getPersistenceFile(); - if (persistenceFile.exists()) { - try (final FileInputStream fis = new FileInputStream(persistenceFile)) { - final Properties props = new Properties(); - props.load(fis); - - // get the local timestamp for this directory, if it exists. - final String locallyPersistedValue = props.getProperty(directory); - if (locallyPersistedValue != null) { - final EntityListing listing = deserialize(locallyPersistedValue); - final long localTimestamp = listing.getLatestTimestamp().getTime(); - - // If distributed state doesn't have an entry or the local entry is later than the distributed state, - // update the distributed state so that we are in sync. - if (client != null && (minTimestamp == null || localTimestamp > minTimestamp)) { - minTimestamp = localTimestamp; - - // Our local persistence file shows a later time than the Distributed service. - // Update the distributed service to match our local state. - try { - final StringSerDe serde = new StringSerDe(); - client.put(getKey(directory), locallyPersistedValue, serde, serde); - } catch (final IOException ioe) { - getLogger().warn("Local timestamp for {} is {}, which is later than Distributed state but failed to update Distributed " - + "state due to {}. If a new node performs Listing, data duplication may occur", - new Object[] {directory, locallyPersistedValue, ioe}); - } - } - } - } - } + // Migrate state from the old way of managing state (distributed cache service and local file) + // to the new mechanism (State Manager). + migrateState(path, client, context.getStateManager()); } catch (final IOException ioe) { - getLogger().warn("Failed to recover local state due to {}. Assuming that the state from the distributed cache is correct.", ioe); + throw new IOException("Failed to properly migrate state to State Manager", ioe); } } - return minTimestamp; - } - - - private String serializeState(final List<T> entities) throws JsonGenerationException, JsonMappingException, IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - if (entities.isEmpty()) { - return null; - } else { - final List<T> sortedEntities = new ArrayList<>(entities); - Collections.sort(sortedEntities, new Comparator<ListableEntity>() { - @Override - public int compare(final ListableEntity o1, final ListableEntity o2) { - return Long.compare(o1.getTimestamp(), o2.getTimestamp()); - } - }); - - final long latestListingModTime = sortedEntities.get(sortedEntities.size() - 1).getTimestamp(); - final Set<String> idsWithTimestampEqualToListingTime = new HashSet<>(); - for (int i = sortedEntities.size() - 1; i >= 0; i--) { - final ListableEntity entity = sortedEntities.get(i); - if (entity.getTimestamp() == latestListingModTime) { - idsWithTimestampEqualToListingTime.add(entity.getIdentifier()); - } - } - - this.latestIdentifiersListed = idsWithTimestampEqualToListingTime; + // delete the local file, since it is no longer needed + final File localFile = new File(path); + if (localFile.exists() && !!localFile.delete()) { + getLogger().warn("Migrated state but failed to delete local persistence file"); + } - final EntityListing listing = new EntityListing(); - listing.setLatestTimestamp(new Date(latestListingModTime)); - final Set<String> ids = new HashSet<>(); - for (final String id : idsWithTimestampEqualToListingTime) { - ids.add(id); + // remove entry from Distributed cache server + if (client != null) { + try { + client.remove(path, new StringSerDe()); + } catch (final IOException ioe) { + getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new " + + "State Management service, so the Distributed Cache Service is no longer needed."); } - listing.setMatchingIdentifiers(ids); + } - final ObjectMapper mapper = new ObjectMapper(); - final String serializedState = mapper.writerWithType(EntityListing.class).writeValueAsString(listing); - return serializedState; + if (resetListing) { + context.getStateManager().clear(Scope.CLUSTER); + resetListing = false; } } - protected void persistLocalState(final String path, final String serializedState) throws IOException { - // we need to keep track of all files that we pulled in that had a modification time equal to - // lastListingTime so that we can avoid pulling those files in again. We can't just ignore any files - // that have a mod time equal to that timestamp because more files may come in with the same timestamp - // later in the same millisecond. - final File persistenceFile = getPersistenceFile(); - final File dir = persistenceFile.getParentFile(); - if (!dir.exists() && !dir.mkdirs()) { - throw new IOException("Could not create directory " + dir.getAbsolutePath() + " in order to save local state"); + /** + * This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of + * the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager, + * if any state already exists + * + * @param path the path to migrate state for + * @param client the DistributedMapCacheClient that is capable of obtaining the current state + * @param stateManager the StateManager to use in order to store the new state + * @throws IOException if unable to retrieve or store the state + */ + private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager) throws IOException { + Long minTimestamp = null; + final Set<String> latestIdentifiersListed = new HashSet<>(); + + // Retrieve state from Distributed Cache Client + if (client != null) { + final StringSerDe serde = new StringSerDe(); + final String serializedState = client.get(getKey(path), serde, serde); + if (serializedState != null && !serializedState.isEmpty()) { + final EntityListing listing = deserialize(serializedState); + minTimestamp = listing.getLatestTimestamp().getTime(); + latestIdentifiersListed.addAll(listing.getMatchingIdentifiers()); + } } - final Properties props = new Properties(); + // Retrieve state from locally persisted file + final File persistenceFile = getPersistenceFile(); if (persistenceFile.exists()) { + final Properties props = new Properties(); + try (final FileInputStream fis = new FileInputStream(persistenceFile)) { props.load(fis); } + + final String locallyPersistedValue = props.getProperty(path); + if (locallyPersistedValue != null) { + final EntityListing listing = deserialize(locallyPersistedValue); + final long localTimestamp = listing.getLatestTimestamp().getTime(); + if (minTimestamp == null || localTimestamp > minTimestamp) { + minTimestamp = localTimestamp; + latestIdentifiersListed.clear(); + latestIdentifiersListed.addAll(listing.getMatchingIdentifiers()); + } + } } - props.setProperty(path, serializedState); + if (minTimestamp != null) { + persist(minTimestamp, latestIdentifiersListed, stateManager); + } + } - try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) { - props.store(fos, null); + private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager) throws IOException { + final Map<String, String> updatedState = new HashMap<>(identifiers.size() + 1); + updatedState.put(TIMESTAMP, String.valueOf(timestamp)); + int counter = 0; + for (final String identifier : identifiers) { + final String index = String.valueOf(++counter); + updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier); } + stateManager.setState(updatedState, Scope.CLUSTER); + } + + protected String getKey(final String directory) { + return getIdentifier() + ".lastListingTime." + directory; + } + + private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException { + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(serializedState); + return mapper.readValue(jsonNode, EntityListing.class); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final String path = getPath(context); - final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); - - final Long minTimestamp; + Long minTimestamp = lastListingTime; try { - minTimestamp = getMinTimestamp(path, client); + // We need to fetch the state from the cluster if we don't yet know the last listing time, + // or if we were just elected the primary node + if (this.lastListingTime == null || electedPrimaryNode) { + final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + final Map<String, String> stateValues = stateMap.toMap(); + final String timestamp = stateValues.get(TIMESTAMP); + + if (timestamp == null) { + minTimestamp = 0L; + latestIdentifiersListed.clear(); + } else { + minTimestamp = this.lastListingTime = Long.parseLong(timestamp); + latestIdentifiersListed.clear(); + for (final Map.Entry<String, String> entry : stateValues.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + if (TIMESTAMP.equals(key)) { + continue; + } + + latestIdentifiersListed.add(value); + } + } + } } catch (final IOException ioe) { getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished."); context.yield(); @@ -403,32 +392,19 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab // previously Primary Node left off. // We also store the state locally so that if the node is restarted, and the node cannot contact // the distributed state cache, the node can continue to run (if it is primary node). - String serializedState = null; + final Set<String> identifiers = new HashSet<>(entityList.size()); try { - serializedState = serializeState(entityList); - } catch (final Exception e) { - getLogger().error("Failed to serialize state due to {}", new Object[] {e}); - } - - if (serializedState != null) { - // Save our state locally. - try { - persistLocalState(path, serializedState); - } catch (final IOException ioe) { - getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe); - } - - // Attempt to save state to remote server. - if (client != null) { - try { - client.put(getKey(path), serializedState, new StringSerDe(), new StringSerDe()); - } catch (final IOException ioe) { - getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe); - } + for (final T entity : entityList) { + identifiers.add(entity.getIdentifier()); } + persist(latestListingTimestamp, identifiers, context.getStateManager()); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state due to {}. If NiFi restarted before state is saved, or " + + "if another node begins executing this Processor, data duplication may occur.", ioe); } lastListingTime = latestListingTimestamp; + latestIdentifiersListed = identifiers; } else { getLogger().debug("There is no data to list. Yielding."); context.yield();
