http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 8c903ee..4985fe6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -24,6 +24,7 @@ import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.attribute.expression.language.Query.Range;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -35,11 +36,13 @@ public class StandardSchedulingContext implements 
SchedulingContext {
     private final ProcessContext processContext;
     private final ControllerServiceProvider serviceProvider;
     private final ProcessorNode processorNode;
+    private final StateManager stateManager;
 
-    public StandardSchedulingContext(final ProcessContext processContext, 
final ControllerServiceProvider serviceProvider, final ProcessorNode 
processorNode) {
+    public StandardSchedulingContext(final ProcessContext processContext, 
final ControllerServiceProvider serviceProvider, final ProcessorNode 
processorNode, final StateManager stateManager) {
         this.processContext = processContext;
         this.serviceProvider = serviceProvider;
         this.processorNode = processorNode;
+        this.stateManager = stateManager;
     }
 
     @Override
@@ -139,4 +142,9 @@ public class StandardSchedulingContext implements 
SchedulingContext {
         final List<Range> elRanges = 
Query.extractExpressionRanges(getProperty(property).getValue());
         return (elRanges != null && !elRanges.isEmpty());
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
new file mode 100644
index 0000000..6786be9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/META-INF/services/org.apache.nifi.components.state.StateProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider
+org.apache.nifi.components.state.providers.zookeeper.ZooKeeperStateProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
index 770bb47..543bfd0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
@@ -38,6 +38,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
@@ -72,12 +73,13 @@ public class TestStandardProcessScheduler {
     private StandardProcessScheduler scheduler = null;
     private ReportingTaskNode taskNode = null;
     private TestReportingTask reportingTask = null;
+    private StateManagerProvider stateMgrProvider = 
Mockito.mock(StateManagerProvider.class);
 
     @Before
     public void setup() throws InitializationException {
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
         this.refreshNiFiProperties();
-        scheduler = new 
StandardProcessScheduler(Mockito.mock(Heartbeater.class), 
Mockito.mock(ControllerServiceProvider.class), null);
+        scheduler = new 
StandardProcessScheduler(Mockito.mock(Heartbeater.class), 
Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider);
         scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, 
Mockito.mock(SchedulingAgent.class));
 
         reportingTask = new TestReportingTask();
@@ -117,12 +119,12 @@ public class TestStandardProcessScheduler {
     public void 
testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws 
InterruptedException {
         final Processor proc = new ServiceReferencingProcessor();
 
-        final ControllerServiceProvider serviceProvider = new 
StandardControllerServiceProvider(scheduler, null);
+        final ControllerServiceProvider serviceProvider = new 
StandardControllerServiceProvider(scheduler, null, 
Mockito.mock(StateManagerProvider.class));
         final ControllerServiceNode service = 
serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), 
"service", true);
         final ProcessorNode procNode = new StandardProcessorNode(proc, 
UUID.randomUUID().toString(),
                 new StandardValidationContextFactory(serviceProvider), 
scheduler, serviceProvider);
 
-        
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), 
service.getIdentifier());
+        
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), 
service.getIdentifier(), true);
 
         scheduler.enableControllerService(service);
         scheduler.startProcessor(procNode);
@@ -198,7 +200,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateServiceEnablementLogicHappensOnlyOnce() throws 
Exception {
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(SimpleTestService.class.getName(),
                 "1", false);
         assertFalse(serviceNode.isActive());
@@ -237,7 +239,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateDisabledServiceCantBeDisabled() throws Exception {
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(SimpleTestService.class.getName(),
                 "1", false);
         SimpleTestService ts = (SimpleTestService) 
serviceNode.getControllerServiceImplementation();
@@ -275,7 +277,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception 
{
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(SimpleTestService.class.getName(),
                 "1", false);
         SimpleTestService ts = (SimpleTestService) 
serviceNode.getControllerServiceImplementation();
@@ -309,7 +311,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateDisablingOfTheFailedService() throws Exception {
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(FailingService.class.getName(),
                 "1", false);
         scheduler.enableControllerService(serviceNode);
@@ -340,7 +342,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateEnabledDisableMultiThread() throws Exception {
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         ExecutorService executor = Executors.newCachedThreadPool();
         for (int i = 0; i < 200; i++) {
             final ControllerServiceNode serviceNode = provider
@@ -383,7 +385,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateNeverEnablingServiceCanStillBeDisabled() throws 
Exception {
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(LongEnablingService.class.getName(),
                 "1", false);
         LongEnablingService ts = (LongEnablingService) 
serviceNode.getControllerServiceImplementation();
@@ -408,7 +410,7 @@ public class TestStandardProcessScheduler {
     @Test
     public void validateLongEnablingServiceCanStillBeDisabled() throws 
Exception {
         final ProcessScheduler scheduler = createScheduler();
-        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateMgrProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(LongEnablingService.class.getName(),
                 "1", false);
         LongEnablingService ts = (LongEnablingService) 
serviceNode.getControllerServiceImplementation();
@@ -505,6 +507,6 @@ public class TestStandardProcessScheduler {
     }
 
     private ProcessScheduler createScheduler() {
-        return new StandardProcessScheduler(mock(Heartbeater.class), null, 
null);
+        return new StandardProcessScheduler(mock(Heartbeater.class), null, 
null, stateMgrProvider);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index ed9af31..b9c0f7f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.service;
 
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.StandardFlowServiceTest;
 import org.apache.nifi.nar.ExtensionManager;
@@ -25,6 +27,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class StandardControllerServiceProviderTest {
 
@@ -43,7 +46,28 @@ public class StandardControllerServiceProviderTest {
     public void setup() throws Exception {
         String id = "id";
         String clazz = 
"org.apache.nifi.controller.service.util.TestControllerService";
-        ControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null);
+        ControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null, new StateManagerProvider() {
+            @Override
+            public StateManager getStateManager(final String componentId) {
+                return Mockito.mock(StateManager.class);
+            }
+
+            @Override
+            public void shutdown() {
+            }
+
+            @Override
+            public void enableClusterProvider() {
+            }
+
+            @Override
+            public void disableClusterProvider() {
+            }
+
+            @Override
+            public void onComponentRemoved(String componentId) {
+            }
+        });
         ControllerServiceNode node = provider.createControllerService(clazz, 
id, true);
         proxied = node.getProxiedControllerService();
         implementation = node.getControllerServiceImplementation();
@@ -68,4 +92,4 @@ public class StandardControllerServiceProviderTest {
     public void testCallImplementationInitialized() throws 
InitializationException {
         implementation.initialize(null);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index eb0fa13..ca63ba4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
@@ -49,6 +51,28 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class TestStandardControllerServiceProvider {
+    private static StateManagerProvider stateManagerProvider = new 
StateManagerProvider() {
+        @Override
+        public StateManager getStateManager(String componentId) {
+            return Mockito.mock(StateManager.class);
+        }
+
+        @Override
+        public void shutdown() {
+        }
+
+        @Override
+        public void enableClusterProvider() {
+        }
+
+        @Override
+        public void disableClusterProvider() {
+        }
+
+        @Override
+        public void onComponentRemoved(String componentId) {
+        }
+    };
 
     @BeforeClass
     public static void setNiFiProps() {
@@ -57,13 +81,13 @@ public class TestStandardControllerServiceProvider {
 
     private ProcessScheduler createScheduler() {
         final Heartbeater heartbeater = Mockito.mock(Heartbeater.class);
-        return new StandardProcessScheduler(heartbeater, null, null);
+        return new StandardProcessScheduler(heartbeater, null, null, 
stateManagerProvider);
     }
 
     @Test
     public void testDisableControllerService() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
 
         final ControllerServiceNode serviceNode = 
provider.createControllerService(ServiceB.class.getName(), "B", false);
         provider.enableControllerService(serviceNode);
@@ -73,12 +97,12 @@ public class TestStandardControllerServiceProvider {
     @Test(timeout=10000)
     public void testEnableDisableWithReference() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
 
         final ControllerServiceNode serviceNodeB = 
provider.createControllerService(ServiceB.class.getName(), "B", false);
         final ControllerServiceNode serviceNodeA = 
provider.createControllerService(ServiceA.class.getName(), "A", false);
 
-        serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
+        serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B", true);
 
         try {
             provider.enableControllerService(serviceNodeA);
@@ -126,7 +150,7 @@ public class TestStandardControllerServiceProvider {
     }
 
     public void testEnableReferencingServicesGraph(ProcessScheduler scheduler) 
{
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
 
         // build a graph of controller services with dependencies as such:
         //
@@ -145,10 +169,10 @@ public class TestStandardControllerServiceProvider {
         final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3", false);
         final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4", false);
 
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
+        serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4", 
true);
 
         provider.enableControllerService(serviceNode4);
         provider.enableReferencingServices(serviceNode4);
@@ -169,7 +193,7 @@ public class TestStandardControllerServiceProvider {
     @Test(timeout=10000)
     public void testStartStopReferencingComponents() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(scheduler, null, stateManagerProvider);
 
         // build a graph of reporting tasks and controller services with 
dependencies as such:
         //
@@ -213,20 +237,20 @@ public class TestStandardControllerServiceProvider {
         final ProcessorNode procNodeA = new StandardProcessorNode(new 
DummyProcessor(), id1,
                 new StandardValidationContextFactory(provider), scheduler, 
provider);
         procNodeA.getProcessor().initialize(new 
StandardProcessorInitializationContext(id1, null, provider));
-        procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
+        procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1", true);
         procNodeA.setProcessGroup(mockProcessGroup);
 
         final String id2 = UUID.randomUUID().toString();
         final ProcessorNode procNodeB = new StandardProcessorNode(new 
DummyProcessor(), id2,
                 new StandardValidationContextFactory(provider), scheduler, 
provider);
         procNodeB.getProcessor().initialize(new 
StandardProcessorInitializationContext(id2, null, provider));
-        procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
+        procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3", true);
         procNodeB.setProcessGroup(mockProcessGroup);
 
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
+        serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4", 
true);
 
         provider.enableControllerService(serviceNode4);
         provider.enableReferencingServices(serviceNode4);
@@ -280,11 +304,11 @@ public class TestStandardControllerServiceProvider {
 
     @Test
     public void testOrderingOfServices() {
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null, stateManagerProvider);
         final ControllerServiceNode serviceNode1 = 
provider.createControllerService(ServiceA.class.getName(), "1", false);
         final ControllerServiceNode serviceNode2 = 
provider.createControllerService(ServiceB.class.getName(), "2", false);
 
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
 
         final Map<String, ControllerServiceNode> nodeMap = new 
LinkedHashMap<>();
         nodeMap.put("1", serviceNode1);
@@ -314,7 +338,7 @@ public class TestStandardControllerServiceProvider {
 
         // add circular dependency on self.
         nodeMap.clear();
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1", 
true);
         nodeMap.put("1", serviceNode1);
         nodeMap.put("2", serviceNode2);
 
@@ -341,8 +365,8 @@ public class TestStandardControllerServiceProvider {
         // like that.
         nodeMap.clear();
         final ControllerServiceNode serviceNode3 = 
provider.createControllerService(ServiceA.class.getName(), "3", false);
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3", true);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1", true);
         nodeMap.put("1", serviceNode1);
         nodeMap.put("3", serviceNode3);
         branches = 
StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
@@ -364,10 +388,10 @@ public class TestStandardControllerServiceProvider {
 
         // Add multiple completely disparate branches.
         nodeMap.clear();
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
         final ControllerServiceNode serviceNode4 = 
provider.createControllerService(ServiceB.class.getName(), "4", false);
         final ControllerServiceNode serviceNode5 = 
provider.createControllerService(ServiceB.class.getName(), "5", false);
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true);
         nodeMap.put("1", serviceNode1);
         nodeMap.put("2", serviceNode2);
         nodeMap.put("3", serviceNode3);
@@ -398,8 +422,8 @@ public class TestStandardControllerServiceProvider {
 
         // create 2 branches both dependent on the same service
         nodeMap.clear();
-        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
-        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+        serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
+        serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
         nodeMap.put("1", serviceNode1);
         nodeMap.put("2", serviceNode2);
         nodeMap.put("3", serviceNode3);
@@ -426,7 +450,7 @@ public class TestStandardControllerServiceProvider {
         final ProcessorNode procNode = new StandardProcessorNode(new 
DummyProcessor(), UUID.randomUUID().toString(),
                 new StandardValidationContextFactory(serviceProvider), 
scheduler, serviceProvider);
 
-        final ProcessGroup group = new 
StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, 
null, null);
+        final ProcessGroup group = new 
StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, 
null, null, null);
         group.addProcessor(procNode);
         procNode.setProcessGroup(group);
 
@@ -436,7 +460,7 @@ public class TestStandardControllerServiceProvider {
     @Test
     public void testEnableReferencingComponents() {
         final ProcessScheduler scheduler = createScheduler();
-        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null);
+        final StandardControllerServiceProvider provider = new 
StandardControllerServiceProvider(null, null, stateManagerProvider);
         final ControllerServiceNode serviceNode = 
provider.createControllerService(ServiceA.class.getName(), "1", false);
 
         final ProcessorNode procNode = createProcessor(scheduler, provider);

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
new file mode 100644
index 0000000..d724be0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.providers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+import org.junit.Test;
+
+
+/**
+ * <p>
+ * Abstract class that provides a suite of test for State Providers. Each 
State Provider implementation can simply extend this class,
+ * implement the getProvider() method, and have the entire suite of tests run 
against the provider.
+ * </p>
+ *
+ * <p>
+ * It is recommended that implementations create a new provider in a method 
annotated with @Before and cleanup in a method annotated with @After.
+ * </p>
+ */
+public abstract class AbstractTestStateProvider {
+    protected final String componentId = 
"111111111-1111-1111-1111-111111111111";
+
+
+    @Test
+    public void testSetAndGet() throws IOException {
+        getProvider().setState(Collections.singletonMap("testSetAndGet", 
"value"), componentId);
+        assertEquals("value", 
getProvider().getState(componentId).get("testSetAndGet"));
+    }
+
+    @Test
+    public void testReplaceSuccessful() throws IOException {
+        final String key = "testReplaceSuccessful";
+        final StateProvider provider = getProvider();
+
+        StateMap map = provider.getState(componentId);
+        assertNotNull(map);
+        assertEquals(-1, map.getVersion());
+
+        assertNotNull(map.toMap());
+        assertTrue(map.toMap().isEmpty());
+        provider.setState(Collections.singletonMap(key, "value1"), 
componentId);
+
+        map = provider.getState(componentId);
+        assertNotNull(map);
+        assertEquals(0, map.getVersion());
+        assertEquals("value1", map.get(key));
+        assertEquals("value1", map.toMap().get(key));
+
+        final Map<String, String> newMap = new HashMap<>(map.toMap());
+        newMap.put(key, "value2");
+        assertTrue(provider.replace(map, newMap, componentId));
+
+        map = provider.getState(componentId);
+        assertEquals("value2", map.get(key));
+        assertEquals(1L, map.getVersion());
+    }
+
+    @Test
+    public void testReplaceWithWrongVersion() throws IOException {
+        final String key = "testReplaceWithWrongVersion";
+        final StateProvider provider = getProvider();
+        provider.setState(Collections.singletonMap(key, "value1"), 
componentId);
+
+        StateMap stateMap = provider.getState(componentId);
+        assertNotNull(stateMap);
+        assertEquals("value1", stateMap.get(key));
+        assertEquals(0, stateMap.getVersion());
+
+        provider.setState(Collections.singletonMap(key, "intermediate value"), 
componentId);
+
+        assertFalse(provider.replace(stateMap, Collections.singletonMap(key, 
"value2"), componentId));
+        stateMap = provider.getState(componentId);
+        assertEquals(key, stateMap.toMap().keySet().iterator().next());
+        assertEquals(1, stateMap.toMap().size());
+        assertEquals("intermediate value", stateMap.get(key));
+        assertEquals(1, stateMap.getVersion());
+    }
+
+
+    @Test
+    public void testToMap() throws IOException {
+        final String key = "testKeySet";
+        final StateProvider provider = getProvider();
+        Map<String, String> map = provider.getState(componentId).toMap();
+        assertNotNull(map);
+        assertTrue(map.isEmpty());
+
+        provider.setState(Collections.singletonMap(key, "value"), componentId);
+        map = provider.getState(componentId).toMap();
+        assertNotNull(map);
+        assertEquals(1, map.size());
+        assertEquals("value", map.get(key));
+
+        provider.setState(Collections.<String, String> emptyMap(), 
componentId);
+
+        final StateMap stateMap = provider.getState(componentId);
+        map = stateMap.toMap();
+        assertNotNull(map);
+        assertTrue(map.isEmpty());
+        assertEquals(1, stateMap.getVersion());
+    }
+
+    @Test
+    public void testClear() throws IOException {
+        final StateProvider provider = getProvider();
+        StateMap stateMap = provider.getState(componentId);
+        assertNotNull(stateMap);
+        assertEquals(-1L, stateMap.getVersion());
+        assertTrue(stateMap.toMap().isEmpty());
+
+        provider.setState(Collections.singletonMap("testClear", "value"), 
componentId);
+
+        stateMap = provider.getState(componentId);
+        assertNotNull(stateMap);
+        assertEquals(0, stateMap.getVersion());
+        assertEquals("value", stateMap.get("testClear"));
+
+        provider.clear(componentId);
+
+        stateMap = provider.getState(componentId);
+        assertNotNull(stateMap);
+        assertEquals(1L, stateMap.getVersion());
+        assertTrue(stateMap.toMap().isEmpty());
+    }
+
+
+    protected abstract StateProvider getProvider();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
new file mode 100644
index 0000000..9f7c4c9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.providers.local;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.controller.state.StateMapUpdate;
+import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.wali.WriteAheadRepository;
+
+public class TestWriteAheadLocalStateProvider extends 
AbstractTestStateProvider {
+    private StateProvider provider;
+    private WriteAheadRepository<StateMapUpdate> wal;
+
+    @Before
+    public void setup() throws IOException {
+        provider = new WriteAheadLocalStateProvider();
+
+        final Map<PropertyDescriptor, PropertyValue> properties = new 
HashMap<>();
+        properties.put(WriteAheadLocalStateProvider.PATH, new 
StandardPropertyValue("target/local-state-provider/" + 
UUID.randomUUID().toString(), null));
+
+        provider.initialize(new StateProviderInitializationContext() {
+            @Override
+            public String getIdentifier() {
+                return "Unit Test Provider Initialization Context";
+            }
+
+            @Override
+            public Map<PropertyDescriptor, PropertyValue> getProperties() {
+                return Collections.unmodifiableMap(properties);
+            }
+
+            @Override
+            public PropertyValue getProperty(final PropertyDescriptor 
property) {
+                final PropertyValue prop = properties.get(property);
+                if (prop == null) {
+                    return new StandardPropertyValue(null, null);
+                }
+                return prop;
+            }
+
+            @Override
+            public SSLContext getSSLContext() {
+                return null;
+            }
+        });
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        provider.onComponentRemoved(componentId);
+
+        if (wal != null) {
+            wal.shutdown();
+        }
+    }
+
+    @Override
+    protected StateProvider getProvider() {
+        return provider;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
new file mode 100644
index 0000000..b841285
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.providers.zookeeper;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
+
+    private StateProvider provider;
+    private static TestingServer zkServer;
+
+    @BeforeClass
+    public static void createZooKeeper() throws Exception {
+        zkServer = new TestingServer(2181, true);
+        zkServer.start();
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        if (zkServer != null) {
+            zkServer.stop();
+            zkServer.close();
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        final Map<PropertyDescriptor, PropertyValue> properties = new 
HashMap<>();
+        properties.put(ZooKeeperStateProvider.CONNECTION_STRING, new 
StandardPropertyValue(zkServer.getConnectString(), null));
+        properties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, new 
StandardPropertyValue("3 secs", null));
+        properties.put(ZooKeeperStateProvider.ROOT_NODE, new 
StandardPropertyValue("/nifi/team1/testing", null));
+
+        provider = new ZooKeeperStateProvider();
+        provider.initialize(new StateProviderInitializationContext() {
+            @Override
+            public String getIdentifier() {
+                return "Unit Test Provider Initialization Context";
+            }
+
+            @Override
+            public Map<PropertyDescriptor, PropertyValue> getProperties() {
+                return Collections.unmodifiableMap(properties);
+            }
+
+            @Override
+            public PropertyValue getProperty(final PropertyDescriptor 
property) {
+                final PropertyValue prop = properties.get(property);
+                if (prop == null) {
+                    return new StandardPropertyValue(null, null);
+                }
+                return prop;
+            }
+
+            @Override
+            public SSLContext getSSLContext() {
+                return null;
+            }
+        });
+
+        provider.enable();
+    }
+
+    @After
+    public void clear() throws IOException {
+        getProvider().onComponentRemoved(componentId);
+        getProvider().disable();
+        getProvider().shutdown();
+    }
+
+    @Override
+    protected StateProvider getProvider() {
+        return provider;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 00e1274..4a665d9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -33,6 +33,20 @@ nifi.nar.library.directory=${nifi.nar.library.directory}
 nifi.nar.working.directory=${nifi.nar.working.directory}
 nifi.documentation.working.directory=${nifi.documentation.working.directory}
 
+####################
+# State Management #
+####################
+nifi.state.management.configuration.file=${nifi.state.management.configuration.file}
+# The ID of the local state provider
+nifi.state.management.provider.local=${nifi.state.management.provider.local}
+# The ID of the cluster-wide state provider. This will be ignored if NiFi is 
not clustered but must be populated if running in a cluster.
+nifi.state.management.provider.cluster=${nifi.state.management.provider.cluster}
+# Specifies whether or not this instance of NiFi should run an embedded 
ZooKeeper server
+nifi.state.management.embedded.zookeeper.start=${nifi.state.management.embedded.zookeeper.start}
+# Properties file that provides the ZooKeeper properties to use if 
<nifi.state.management.embedded.zookeeper.start> is set to true
+nifi.state.management.embedded.zookeeper.properties=${nifi.state.management.embedded.zookeeper.properties}
+
+
 # H2 Settings
 nifi.database.directory=${nifi.database.directory}
 nifi.h2.url.append=${nifi.h2.url.append}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
new file mode 100644
index 0000000..ba5259c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!--
+  This file provides a mechanism for defining and configuring the State 
Providers
+  that should be used for storing state locally and across a NiFi cluster. In 
order
+  to use a specific provider, it must be configured here and its identifier
+  must be specified in the nifi.properties file.
+-->
+<stateManagement>
+    <!--
+        State Provider that stores state locally in a configurable directory. 
This Provider requires the following properties:
+        
+        Directory - the directory to store components' state in. If the 
directory being used is a sub-directory of the NiFi installation, it
+                    is important that the directory be copied over to the new 
version when upgrading NiFi.
+     -->
+    <local-provider>
+        <id>local-provider</id>
+        
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
+        <property name="Directory">./state</property>
+    </local-provider>
+
+    <!--
+        State Provider that is used to store state in ZooKeeper. This Provider 
requires the following properties:
+        
+        Root Node - the root node in ZooKeeper where state should be stored. 
The default is '/nifi', but it is advisable to change this to a different value 
if not using
+                   the embedded ZooKeeper server and if multiple NiFi 
instances may all be using the same ZooKeeper Server.
+                   
+        Connect String - A comma-separated list of host:port pairs to connect 
to ZooKeeper. For example, localhost:2181,localhost:5555,127.0.0.1:6666
+        
+        Session Timeout - Specifies how long this instance of NiFi is allowed 
to be disconnected from ZooKeeper before creating a new ZooKeeper Session. 
Default value is "3 seconds"
+    -->
+    <cluster-provider>
+        <id>zk-provider</id>
+        
<class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
+        <property name="Connect String"></property>
+        <property name="Root Node">/nifi</property>
+        <property name="Session Timeout">30 seconds</property>
+    </cluster-provider>
+</stateManagement>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties
new file mode 100644
index 0000000..55afc94
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/zookeeper.properties
@@ -0,0 +1,30 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+#
+
+clientPort=2181
+initLimit=10
+autopurge.purgeInterval=24
+syncLimit=5
+tickTime=2000
+dataDir=./state/zookeeper
+autopurge.snapRetainCount=30
+server.1=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
index 7d0ffab..ef7a61c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
@@ -133,7 +133,7 @@ public class ClusterManagerServerProtocol implements 
ServerProtocol {
                 continue;
             }
 
-            dos.writeUTF(nodeInfo.getHostname());
+            dos.writeUTF(nodeInfo.getSiteToSiteHostname());
             dos.writeInt(nodeInfo.getSiteToSitePort());
             dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
             dos.writeInt(nodeInfo.getTotalFlowFiles());

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 486e6d7..0676668 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -241,9 +241,9 @@ public class StandardControllerServiceDAO extends 
ComponentDAO implements Contro
                 final String propName = entry.getKey();
                 final String propVal = entry.getValue();
                 if (isNotNull(propName) && propVal == null) {
-                    controllerService.removeProperty(propName);
+                    controllerService.removeProperty(propName, true);
                 } else if (isNotNull(propName)) {
-                    controllerService.setProperty(propName, propVal);
+                    controllerService.setProperty(propName, propVal, true);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index e52a476..81d882c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -171,9 +171,9 @@ public class StandardProcessorDAO extends ComponentDAO 
implements ProcessorDAO {
                     final String propName = entry.getKey();
                     final String propVal = entry.getValue();
                     if (isNotNull(propName) && propVal == null) {
-                        processor.removeProperty(propName);
+                        processor.removeProperty(propName, true);
                     } else if (isNotNull(propName)) {
-                        processor.setProperty(propName, propVal);
+                        processor.setProperty(propName, propVal, true);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
index ffe606a..38d793d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
@@ -285,9 +285,9 @@ public class StandardReportingTaskDAO extends ComponentDAO 
implements ReportingT
                 final String propName = entry.getKey();
                 final String propVal = entry.getValue();
                 if (isNotNull(propName) && propVal == null) {
-                    reportingTask.removeProperty(propName);
+                    reportingTask.removeProperty(propName, true);
                 } else if (isNotNull(propName)) {
-                    reportingTask.setProperty(propName, propVal);
+                    reportingTask.setProperty(propName, propVal, true);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties
index 10db651..8c54f24 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/nifi.properties
@@ -29,6 +29,13 @@ nifi.ui.autorefresh.interval=30 sec
 nifi.nar.library.directory=target/test-classes/access-control/lib
 nifi.nar.working.directory=target/test-classes/access-control/nar
 
+nifi.state.management.configuration.file=target/test-classes/access-control/state-management.xml
+nifi.state.management.embedded.zookeeper.start=false
+nifi.state.management.embedded.zookeeper.properties=
+nifi.state.management.embedded.zookeeper.max.instances=3
+nifi.state.management.provider.local=local-provider
+nifi.state.management.provider.cluster=
+
 # H2 Settings
 nifi.database.directory=target/test-classes/database_repository
 nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml
new file mode 100644
index 0000000..1714c7d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/resources/access-control/state-management.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!--
+    This file lists the authority providers to use when running securely. In 
order
+    to use a specific provider it must be configured here and it's identifier
+    must be specified in the nifi.properties file.
+-->
+<stateManagement>
+    <!--
+      This file provides a mechanism for defining and configuring the State 
Providers
+      that should be used for storing state locally and across a NiFi cluster.
+    -->
+    
+    <!--
+        State Provider that stores state locally in a configurable directory. 
This Provider requires the following properties:
+        
+        Directory - the directory to store components' state in. If the 
directory being used is a sub-directory of the NiFi installation, it
+                    is important that the directory be copied over to the new 
version when upgrading NiFi.
+     -->
+    <local-provider>
+        <id>local-provider</id>
+        
<class>org.apache.nifi.controller.state.providers.local.WriteAheadLocalStateProvider</class>
+        <property 
name="Directory">target/test-classes/access-control/state-management</property>
+    </local-provider>
+</stateManagement>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
index 3a6ac79..e49a8be 100644
--- 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
+++ 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -55,6 +55,7 @@ import ca.uhn.hl7v2.model.Structure;
 import ca.uhn.hl7v2.model.Type;
 import ca.uhn.hl7v2.model.Varies;
 import ca.uhn.hl7v2.parser.PipeParser;
+import ca.uhn.hl7v2.validation.ValidationContext;
 import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
 
 @SideEffectFree
@@ -120,7 +121,7 @@ public class ExtractHL7Attributes extends AbstractProcessor 
{
 
         @SuppressWarnings("resource")
         final HapiContext hapiContext = new DefaultHapiContext();
-        
hapiContext.setValidationContext(ValidationContextFactory.noValidation());
+        hapiContext.setValidationContext((ValidationContext) 
ValidationContextFactory.noValidation());
 
         final PipeParser parser = hapiContext.getPipeParser();
         final String hl7Text = new String(buffer, charset);

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
index 26e8bb6..ba90370 100644
--- 
a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
+++ 
b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -161,7 +161,7 @@ public class RouteHL7 extends AbstractProcessor {
 
         @SuppressWarnings("resource")
         final HapiContext hapiContext = new DefaultHapiContext();
-        
hapiContext.setValidationContext(ValidationContextFactory.noValidation());
+        
hapiContext.setValidationContext((ca.uhn.hl7v2.validation.ValidationContext) 
ValidationContextFactory.noValidation());
 
         final PipeParser parser = hapiContext.getPipeParser();
         final String hl7Text = new String(buffer, charset);

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
index 08f5aa7..802f889 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
@@ -166,8 +166,7 @@ public class EmbeddedKafka {
             ServerConfig configuration = new ServerConfig();
             configuration.readFrom(quorumConfiguration);
 
-            FileTxnSnapLog txnLog = new FileTxnSnapLog(new 
File(configuration.getDataLogDir()),
-                    new File(configuration.getDataDir()));
+            FileTxnSnapLog txnLog = new FileTxnSnapLog(new 
File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
 
             zkServer.setTxnLogFactory(txnLog);
             zkServer.setTickTime(configuration.getTickTime());

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index e592483..20969c9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -19,14 +19,12 @@ package org.apache.nifi.processors.standard;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,9 +32,13 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
@@ -50,7 +52,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.util.EntityListing;
 import org.apache.nifi.processors.standard.util.ListableEntity;
-import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -157,9 +158,13 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         .build();
 
 
-    private volatile Long lastListingTime = null;
     private volatile Set<String> latestIdentifiersListed = new HashSet<>();
+    private volatile Long lastListingTime = null;
     private volatile boolean electedPrimaryNode = false;
+    private volatile boolean resetListing = false;
+
+    static final String TIMESTAMP = "timestamp";
+    static final String IDENTIFIER_PREFIX = "id";
 
     protected File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
@@ -177,6 +182,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         if (isListingResetNecessary(descriptor)) {
             lastListingTime = null; // clear lastListingTime so that we have 
to fetch new time
             latestIdentifiersListed = new HashSet<>();
+            resetListing = true;
         }
     }
 
@@ -187,10 +193,6 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         return relationships;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
         if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
@@ -198,159 +200,146 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
     }
 
-    private EntityListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final JsonNode jsonNode = mapper.readTree(serializedState);
-        return mapper.readValue(jsonNode, EntityListing.class);
-    }
-
-
-    private Long getMinTimestamp(final String directory, final 
DistributedMapCacheClient client) throws IOException {
-        // Determine the timestamp for the last file that we've listed.
-        Long minTimestamp = lastListingTime;
-        if (minTimestamp == null || electedPrimaryNode) {
-            // We haven't yet restored any state from local or distributed 
state - or it's been at least a minute since
-            // we have performed a listing. In this case,
-            // First, attempt to get timestamp from distributed cache service.
-            if (client != null) {
-                try {
-                    final StringSerDe serde = new StringSerDe();
-                    final String serializedState = 
client.get(getKey(directory), serde, serde);
-                    if (serializedState == null || serializedState.isEmpty()) {
-                        minTimestamp = null;
-                        this.latestIdentifiersListed = Collections.emptySet();
-                    } else {
-                        final EntityListing listing = 
deserialize(serializedState);
-                        this.lastListingTime = 
listing.getLatestTimestamp().getTime();
-                        minTimestamp = listing.getLatestTimestamp().getTime();
-                        this.latestIdentifiersListed = new 
HashSet<>(listing.getMatchingIdentifiers());
-                    }
-
-                    this.lastListingTime = minTimestamp;
-                    electedPrimaryNode = false; // no requirement to pull an 
update from the distributed cache anymore.
-                } catch (final IOException ioe) {
-                    throw ioe;
-                }
-            }
+    @OnScheduled
+    public final void updateState(final ProcessContext context) throws 
IOException {
+        final String path = getPath(context);
+        final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
 
-            // Check the persistence file. We want to use the latest timestamp 
that we have so that
-            // we don't duplicate data.
+        // Check if state already exists for this path. If so, we have already 
migrated the state.
+        final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
+        if (stateMap.getVersion() == -1L) {
             try {
-                final File persistenceFile = getPersistenceFile();
-                if (persistenceFile.exists()) {
-                    try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
-                        final Properties props = new Properties();
-                        props.load(fis);
-
-                        // get the local timestamp for this directory, if it 
exists.
-                        final String locallyPersistedValue = 
props.getProperty(directory);
-                        if (locallyPersistedValue != null) {
-                            final EntityListing listing = 
deserialize(locallyPersistedValue);
-                            final long localTimestamp = 
listing.getLatestTimestamp().getTime();
-
-                            // If distributed state doesn't have an entry or 
the local entry is later than the distributed state,
-                            // update the distributed state so that we are in 
sync.
-                            if (client != null && (minTimestamp == null || 
localTimestamp > minTimestamp)) {
-                                minTimestamp = localTimestamp;
-
-                                // Our local persistence file shows a later 
time than the Distributed service.
-                                // Update the distributed service to match our 
local state.
-                                try {
-                                    final StringSerDe serde = new 
StringSerDe();
-                                    client.put(getKey(directory), 
locallyPersistedValue, serde, serde);
-                                } catch (final IOException ioe) {
-                                    getLogger().warn("Local timestamp for {} 
is {}, which is later than Distributed state but failed to update Distributed "
-                                        + "state due to {}. If a new node 
performs Listing, data duplication may occur",
-                                        new Object[] {directory, 
locallyPersistedValue, ioe});
-                                }
-                            }
-                        }
-                    }
-                }
+                // Migrate state from the old way of managing state 
(distributed cache service and local file)
+                // to the new mechanism (State Manager).
+                migrateState(path, client, context.getStateManager());
             } catch (final IOException ioe) {
-                getLogger().warn("Failed to recover local state due to {}. 
Assuming that the state from the distributed cache is correct.", ioe);
+                throw new IOException("Failed to properly migrate state to 
State Manager", ioe);
             }
         }
 
-        return minTimestamp;
-    }
-
-
-    private String serializeState(final List<T> entities) throws 
JsonGenerationException, JsonMappingException, IOException {
-        // we need to keep track of all files that we pulled in that had a 
modification time equal to
-        // lastListingTime so that we can avoid pulling those files in again. 
We can't just ignore any files
-        // that have a mod time equal to that timestamp because more files may 
come in with the same timestamp
-        // later in the same millisecond.
-        if (entities.isEmpty()) {
-            return null;
-        } else {
-            final List<T> sortedEntities = new ArrayList<>(entities);
-            Collections.sort(sortedEntities, new Comparator<ListableEntity>() {
-                @Override
-                public int compare(final ListableEntity o1, final 
ListableEntity o2) {
-                    return Long.compare(o1.getTimestamp(), o2.getTimestamp());
-                }
-            });
-
-            final long latestListingModTime = 
sortedEntities.get(sortedEntities.size() - 1).getTimestamp();
-            final Set<String> idsWithTimestampEqualToListingTime = new 
HashSet<>();
-            for (int i = sortedEntities.size() - 1; i >= 0; i--) {
-                final ListableEntity entity = sortedEntities.get(i);
-                if (entity.getTimestamp() == latestListingModTime) {
-                    
idsWithTimestampEqualToListingTime.add(entity.getIdentifier());
-                }
-            }
-
-            this.latestIdentifiersListed = idsWithTimestampEqualToListingTime;
+        // delete the local file, since it is no longer needed
+        final File localFile = new File(path);
+        if (localFile.exists() && !!localFile.delete()) {
+            getLogger().warn("Migrated state but failed to delete local 
persistence file");
+        }
 
-            final EntityListing listing = new EntityListing();
-            listing.setLatestTimestamp(new Date(latestListingModTime));
-            final Set<String> ids = new HashSet<>();
-            for (final String id : idsWithTimestampEqualToListingTime) {
-                ids.add(id);
+        // remove entry from Distributed cache server
+        if (client != null) {
+            try {
+                client.remove(path, new StringSerDe());
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to remove entry from Distributed 
Cache Service. However, the state has already been migrated to use the new "
+                    + "State Management service, so the Distributed Cache 
Service is no longer needed.");
             }
-            listing.setMatchingIdentifiers(ids);
+        }
 
-            final ObjectMapper mapper = new ObjectMapper();
-            final String serializedState = 
mapper.writerWithType(EntityListing.class).writeValueAsString(listing);
-            return serializedState;
+        if (resetListing) {
+            context.getStateManager().clear(Scope.CLUSTER);
+            resetListing = false;
         }
     }
 
-    protected void persistLocalState(final String path, final String 
serializedState) throws IOException {
-        // we need to keep track of all files that we pulled in that had a 
modification time equal to
-        // lastListingTime so that we can avoid pulling those files in again. 
We can't just ignore any files
-        // that have a mod time equal to that timestamp because more files may 
come in with the same timestamp
-        // later in the same millisecond.
-        final File persistenceFile = getPersistenceFile();
-        final File dir = persistenceFile.getParentFile();
-        if (!dir.exists() && !dir.mkdirs()) {
-            throw new IOException("Could not create directory " + 
dir.getAbsolutePath() + " in order to save local state");
+    /**
+     * This processor used to use the DistributedMapCacheClient in order to 
store cluster-wide state, before the introduction of
+     * the StateManager. This method will migrate state from that 
DistributedMapCacheClient, or from a local file, to the StateManager,
+     * if any state already exists
+     *
+     * @param path the path to migrate state for
+     * @param client the DistributedMapCacheClient that is capable of 
obtaining the current state
+     * @param stateManager the StateManager to use in order to store the new 
state
+     * @throws IOException if unable to retrieve or store the state
+     */
+    private void migrateState(final String path, final 
DistributedMapCacheClient client, final StateManager stateManager) throws 
IOException {
+        Long minTimestamp = null;
+        final Set<String> latestIdentifiersListed = new HashSet<>();
+
+        // Retrieve state from Distributed Cache Client
+        if (client != null) {
+            final StringSerDe serde = new StringSerDe();
+            final String serializedState = client.get(getKey(path), serde, 
serde);
+            if (serializedState != null && !serializedState.isEmpty()) {
+                final EntityListing listing = deserialize(serializedState);
+                minTimestamp = listing.getLatestTimestamp().getTime();
+                
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
+            }
         }
 
-        final Properties props = new Properties();
+        // Retrieve state from locally persisted file
+        final File persistenceFile = getPersistenceFile();
         if (persistenceFile.exists()) {
+            final Properties props = new Properties();
+
             try (final FileInputStream fis = new 
FileInputStream(persistenceFile)) {
                 props.load(fis);
             }
+
+            final String locallyPersistedValue = props.getProperty(path);
+            if (locallyPersistedValue != null) {
+                final EntityListing listing = 
deserialize(locallyPersistedValue);
+                final long localTimestamp = 
listing.getLatestTimestamp().getTime();
+                if (minTimestamp == null || localTimestamp > minTimestamp) {
+                    minTimestamp = localTimestamp;
+                    latestIdentifiersListed.clear();
+                    
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
+                }
+            }
         }
 
-        props.setProperty(path, serializedState);
+        if (minTimestamp != null) {
+            persist(minTimestamp, latestIdentifiersListed, stateManager);
+        }
+    }
 
-        try (final FileOutputStream fos = new 
FileOutputStream(persistenceFile)) {
-            props.store(fos, null);
+    private void persist(final long timestamp, final Collection<String> 
identifiers, final StateManager stateManager) throws IOException {
+        final Map<String, String> updatedState = new 
HashMap<>(identifiers.size() + 1);
+        updatedState.put(TIMESTAMP, String.valueOf(timestamp));
+        int counter = 0;
+        for (final String identifier : identifiers) {
+            final String index = String.valueOf(++counter);
+            updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier);
         }
+        stateManager.setState(updatedState, Scope.CLUSTER);
+    }
+
+    protected String getKey(final String directory) {
+        return getIdentifier() + ".lastListingTime." + directory;
+    }
+
+    private EntityListing deserialize(final String serializedState) throws 
JsonParseException, JsonMappingException, IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final JsonNode jsonNode = mapper.readTree(serializedState);
+        return mapper.readValue(jsonNode, EntityListing.class);
     }
 
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final String path = getPath(context);
-        final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
-        final Long minTimestamp;
+        Long minTimestamp = lastListingTime;
         try {
-            minTimestamp = getMinTimestamp(path, client);
+            // We need to fetch the state from the cluster if we don't yet 
know the last listing time,
+            // or if we were just elected the primary node
+            if (this.lastListingTime == null || electedPrimaryNode) {
+                final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
+                final Map<String, String> stateValues = stateMap.toMap();
+                final String timestamp = stateValues.get(TIMESTAMP);
+
+                if (timestamp == null) {
+                    minTimestamp = 0L;
+                    latestIdentifiersListed.clear();
+                } else {
+                    minTimestamp = this.lastListingTime = 
Long.parseLong(timestamp);
+                    latestIdentifiersListed.clear();
+                    for (final Map.Entry<String, String> entry : 
stateValues.entrySet()) {
+                        final String key = entry.getKey();
+                        final String value = entry.getValue();
+                        if (TIMESTAMP.equals(key)) {
+                            continue;
+                        }
+
+                        latestIdentifiersListed.add(value);
+                    }
+                }
+            }
         } catch (final IOException ioe) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from Distributed Cache Service. Will not perform listing until this is 
accomplished.");
             context.yield();
@@ -403,32 +392,19 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             // previously Primary Node left off.
             // We also store the state locally so that if the node is 
restarted, and the node cannot contact
             // the distributed state cache, the node can continue to run (if 
it is primary node).
-            String serializedState = null;
+            final Set<String> identifiers = new HashSet<>(entityList.size());
             try {
-                serializedState = serializeState(entityList);
-            } catch (final Exception e) {
-                getLogger().error("Failed to serialize state due to {}", new 
Object[] {e});
-            }
-
-            if (serializedState != null) {
-                // Save our state locally.
-                try {
-                    persistLocalState(path, serializedState);
-                } catch (final IOException ioe) {
-                    getLogger().warn("Unable to save state locally. If the 
node is restarted now, data may be duplicated. Failure is due to {}", ioe);
-                }
-
-                // Attempt to save state to remote server.
-                if (client != null) {
-                    try {
-                        client.put(getKey(path), serializedState, new 
StringSerDe(), new StringSerDe());
-                    } catch (final IOException ioe) {
-                        getLogger().warn("Unable to communicate with 
distributed cache server due to {}. Persisting state locally instead.", ioe);
-                    }
+                for (final T entity : entityList) {
+                    identifiers.add(entity.getIdentifier());
                 }
+                persist(latestListingTimestamp, identifiers, 
context.getStateManager());
+            } catch (final IOException ioe) {
+                getLogger().warn("Unable to save state due to {}. If NiFi 
restarted before state is saved, or "
+                    + "if another node begins executing this Processor, data 
duplication may occur.", ioe);
             }
 
             lastListingTime = latestListingTimestamp;
+            latestIdentifiersListed = identifiers;
         } else {
             getLogger().debug("There is no data to list. Yielding.");
             context.yield();

Reply via email to