http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
new file mode 100644
index 0000000..0b1ab8d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+/**
+ * <p>
+ * Interface that provides a mechanism for obtaining the {@link StateManager} 
for a particular component
+ * </p>
+ */
+public interface StateManagerProvider {
+    /**
+     * Returns the StateManager for the component with the given ID, or 
<code>null</code> if no State Manager
+     * exists for the component with the given ID
+     *
+     * @param componentId the id of the component for which the StateManager 
should be returned
+     *
+     * @return the StateManager for the component with the given ID, or 
<code>null</code> if no State Manager
+     *         exists for the component with the given ID
+     */
+    StateManager getStateManager(String componentId);
+
+    /**
+     * Notifies the State Manager Provider that the component with the given 
ID has been removed from the NiFi instance
+     * and will no longer be needed, so the appropriate resource cleanup can 
take place.
+     *
+     * @param componentId the ID of the component that has been removed
+     */
+    void onComponentRemoved(String componentId);
+
+    /**
+     * Shuts down the state managers, cleaning up any resources that they 
occupy
+     */
+    void shutdown();
+
+    /**
+     * Initializes the Cluster State Provider and enables it for use
+     */
+    void enableClusterProvider();
+
+    /**
+     * Disables the Cluster State Provider and begins using the Local State 
Provider to persist and retrieve
+     * state, even when components request a clustered provider
+     */
+    void disableClusterProvider();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 978c612..214467d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -85,7 +85,7 @@ public abstract class AbstractConfiguredComponent implements 
ConfigurableCompone
     }
 
     @Override
-    public void setProperty(final String name, final String value) {
+    public void setProperty(final String name, final String value, final 
boolean triggerOnPropertyModified) {
         if (null == name || null == value) {
             throw new IllegalArgumentException();
         }
@@ -114,10 +114,12 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
                         }
                     }
 
-                    try {
-                        component.onPropertyModified(descriptor, oldValue, 
value);
-                    } catch (final Throwable t) {
-                        // nothing really to do here...
+                    if (triggerOnPropertyModified) {
+                        try {
+                            component.onPropertyModified(descriptor, oldValue, 
value);
+                        } catch (final Exception e) {
+                            // nothing really to do here...
+                        }
                     }
                 }
             }
@@ -133,11 +135,12 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
      * if was a dynamic property.
      *
      * @param name the property to remove
+     * @param triggerOnPropertyModified specifies whether or not the 
onPropertyModified method should be called
      * @return true if removed; false otherwise
      * @throws java.lang.IllegalArgumentException if the name is null
      */
     @Override
-    public boolean removeProperty(final String name) {
+    public boolean removeProperty(final String name, final boolean 
triggerOnPropertyModified) {
         if (null == name) {
             throw new IllegalArgumentException();
         }
@@ -160,7 +163,10 @@ public abstract class AbstractConfiguredComponent 
implements ConfigurableCompone
                         }
                     }
 
-                    component.onPropertyModified(descriptor, value, null);
+                    if (triggerOnPropertyModified) {
+                        component.onPropertyModified(descriptor, value, null);
+                    }
+
                     return true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
index 8b2794d..7e49700 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
@@ -34,7 +34,15 @@ public interface ConfiguredComponent {
 
     public void setAnnotationData(String data);
 
-    public void setProperty(String name, String value);
+    /**
+     * Sets the property with the given name to the given value
+     * 
+     * @param name the name of the property to update
+     * @param value the value to update the property to
+     * @param triggerOnPropertyModified if <code>true</code>, will trigger the 
#onPropertyModified method of the component
+     *            to be called, otherwise will not
+     */
+    public void setProperty(String name, String value, boolean 
triggerOnPropertyModified);
 
     /**
      * Removes the property and value for the given property name if a
@@ -43,10 +51,12 @@ public interface ConfiguredComponent {
      * if was a dynamic property.
      *
      * @param name the property to remove
+     * @param triggerOnPropertyModified if <code>true</code>, will trigger the 
#onPropertyModified method of the component
+     *            to be called, otherwise will not
      * @return true if removed; false otherwise
      * @throws java.lang.IllegalArgumentException if the name is null
      */
-    public boolean removeProperty(String name);
+    public boolean removeProperty(String name, boolean 
triggerOnPropertyModified);
 
     public Map<PropertyDescriptor, String> getProperties();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 783235a..882695e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -1,19 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+    license agreements. See the NOTICE file distributed with this work for 
additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -120,6 +117,27 @@
             <artifactId>nifi-write-ahead-log</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
index 5414259..ff3ad4e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
@@ -56,8 +56,6 @@ public class HeartbeatPayload {
     private long totalFlowFileCount;
     private long totalFlowFileBytes;
     private SystemDiagnostics systemDiagnostics;
-    private Integer siteToSitePort;
-    private boolean siteToSiteSecure;
     private long systemStartTime;
 
     @XmlJavaTypeAdapter(CounterAdapter.class)
@@ -109,22 +107,6 @@ public class HeartbeatPayload {
         this.systemDiagnostics = systemDiagnostics;
     }
 
-    public boolean isSiteToSiteSecure() {
-        return siteToSiteSecure;
-    }
-
-    public void setSiteToSiteSecure(final boolean secure) {
-        this.siteToSiteSecure = secure;
-    }
-
-    public Integer getSiteToSitePort() {
-        return siteToSitePort;
-    }
-
-    public void setSiteToSitePort(final Integer port) {
-        this.siteToSitePort = port;
-    }
-
     public long getSystemStartTime() {
         return systemStartTime;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index dd3b687..95c93ba 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,7 +16,39 @@
  */
 package org.apache.nifi.controller;
 
-import com.sun.jersey.api.client.ClientHandlerException;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
@@ -37,6 +69,7 @@ import 
org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -88,6 +121,8 @@ import 
org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -173,41 +208,11 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.sun.jersey.api.client.ClientHandlerException;
 
 public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, Heartbeater, QueueProvider {
 
@@ -251,9 +256,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final AuditService auditService;
     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
     private final ComponentStatusRepository componentStatusRepository;
+    private final StateManagerProvider stateManagerProvider;
     private final long systemStartTime = System.currentTimeMillis(); // time 
at which the node was started
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = 
new ConcurrentHashMap<>();
 
+    private volatile ZooKeeperStateServer zooKeeperStateServer;
+
     // The Heartbeat Bean is used to provide an Atomic Reference to data that 
is used in heartbeats that may
     // change while the instance is running. We do this because we want to 
generate heartbeats even if we
     // are unable to obtain a read lock on the entire FlowController.
@@ -419,13 +427,19 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             throw new RuntimeException("Unable to create Provenance 
Repository", e);
         }
 
-        processScheduler = new StandardProcessScheduler(this, this, encryptor);
+        try {
+            this.stateManagerProvider = 
StandardStateManagerProvider.create(properties);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        processScheduler = new StandardProcessScheduler(this, this, encryptor, 
stateManagerProvider);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, 
processScheduler);
-        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository);
+        controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository, 
stateManagerProvider);
 
         final ProcessContextFactory contextFactory = new 
ProcessContextFactory(contentRepository, flowFileRepository, 
flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, 
new EventDrivenSchedulingAgent(
-            eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, 
contextFactory, maxEventDrivenThreads.get(), encryptor));
+            eventDrivenEngineRef.get(), this, stateManagerProvider, 
eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), 
encryptor));
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new 
QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new 
TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, 
encryptor);
@@ -469,7 +483,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         this.snippetManager = new SnippetManager();
 
-        rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), 
this, processScheduler, properties, encryptor);
+        rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), 
this, processScheduler, properties, encryptor, this);
         rootGroup.setName(DEFAULT_ROOT_GROUP_NAME);
         instanceId = UUID.randomUUID().toString();
 
@@ -496,6 +510,17 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             snapshotMillis = 
FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
 TimeUnit.MILLISECONDS);
         }
 
+        // Initialize the Embedded ZooKeeper server, if applicable
+        if (properties.isStartEmbeddedZooKeeper()) {
+            try {
+                zooKeeperStateServer = ZooKeeperStateServer.create(properties);
+            } catch (final IOException | ConfigException e) {
+                throw new IllegalStateException("Unable to initailize Flow 
because NiFi was configured to start an Embedded Zookeeper server but failed to 
do so", e);
+            }
+        } else {
+            zooKeeperStateServer = null;
+        }
+
         componentStatusRepository = createComponentStatusRepository();
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
@@ -668,6 +693,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+
     private ContentRepository createContentRepository(final NiFiProperties 
properties) throws InstantiationException, IllegalAccessException, 
ClassNotFoundException {
         final String implementationClassName = 
properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, 
DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
@@ -714,6 +740,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+
     /**
      * Creates a connection between two Connectable objects.
      *
@@ -835,7 +862,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      * @throws NullPointerException if the argument is null
      */
     public ProcessGroup createProcessGroup(final String id) {
-        return new StandardProcessGroup(requireNonNull(id).intern(), this, 
processScheduler, properties, encryptor);
+        return new StandardProcessGroup(requireNonNull(id).intern(), this, 
processScheduler, properties, encryptor, this);
     }
 
     /**
@@ -945,6 +972,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         return snippetManager;
     }
 
+    public StateManagerProvider getStateManagerProvider() {
+        return stateManagerProvider;
+    }
+
     /**
      * Creates a Port to use as an Input Port for the root Process Group, 
which is used for Site-to-Site communications
      *
@@ -1021,6 +1052,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+
     /**
      * Sets the name for the Root Group, which also changes the name for the 
controller.
      *
@@ -1106,6 +1138,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             // Trigger any processors' methods marked with @OnShutdown to be 
called
             rootGroup.shutdown();
 
+            stateManagerProvider.shutdown();
+
             // invoke any methods annotated with @OnShutdown on Controller 
Services
             for (final ControllerServiceNode serviceNode : 
getAllControllerServices()) {
                 try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
@@ -1443,7 +1477,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
                 for (final Map.Entry<String, String> entry : 
controllerServiceDTO.getProperties().entrySet()) {
                     if (entry.getValue() != null) {
-                        serviceNode.setProperty(entry.getKey(), 
entry.getValue());
+                        serviceNode.setProperty(entry.getKey(), 
entry.getValue(), true);
                     }
                 }
             }
@@ -1561,7 +1595,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 if (config.getProperties() != null) {
                     for (final Map.Entry<String, String> entry : 
config.getProperties().entrySet()) {
                         if (entry.getValue() != null) {
-                            procNode.setProperty(entry.getKey(), 
entry.getValue());
+                            procNode.setProperty(entry.getKey(), 
entry.getValue(), true);
                         }
                     }
                 }
@@ -3019,7 +3053,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
      *
      * @param clustered true if clustered
      * @param clusterInstanceId if clustered is true, indicates the InstanceID 
of the Cluster Manager
-     * @param clusterManagerDn the DN of the NCM
      */
     public void setClustered(final boolean clustered, final String 
clusterInstanceId, final String clusterManagerDn) {
         writeLock.lock();
@@ -3046,8 +3079,26 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 if (clustered) {
                     nodeBulletinSubscriber.set(new 
NodeBulletinProcessingStrategy());
                     
bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get());
+                    stateManagerProvider.enableClusterProvider();
+
+                    if (zooKeeperStateServer != null) {
+                        processScheduler.submitFrameworkTask(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    zooKeeperStateServer.start();
+                                } catch (final Exception e) {
+                                    LOG.error("NiFi was connected to the 
cluster but failed to start embedded ZooKeeper Server", e);
+                                }
+                            }
+                        });
+                    }
                 } else {
                     bulletinRepository.restoreDefaultBulletinProcessing();
+                    if (zooKeeperStateServer != null) {
+                        zooKeeperStateServer.shutdown();
+                    }
+                    stateManagerProvider.disableClusterProvider();
                 }
 
                 final List<RemoteProcessGroup> remoteGroups = 
getGroup(getRootGroupId()).findAllRemoteProcessGroups();
@@ -3063,6 +3114,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
     }
 
+
     /**
      * @return true if this instance is the primary node in the cluster; false 
otherwise
      */
@@ -3687,8 +3739,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 hbPayload.setCounters(getCounters());
                 hbPayload.setSystemDiagnostics(getSystemDiagnostics());
                 hbPayload.setProcessGroupStatus(procGroupStatus);
-                hbPayload.setSiteToSitePort(remoteInputSocketPort);
-                hbPayload.setSiteToSiteSecure(isSiteToSiteSecure);
 
                 // create heartbeat message
                 final Heartbeat heartbeat = new Heartbeat(getNodeId(), 
bean.isPrimary(), bean.isConnected(), hbPayload.marshal());

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 1511293..6250c5a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -26,7 +26,9 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -37,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.cluster.ConnectionException;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
 import org.apache.nifi.cluster.protocol.ConnectionResponse;
@@ -57,9 +60,11 @@ import 
org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.lifecycle.LifeCycleStartException;
 import org.apache.nifi.logging.LogLevel;
@@ -69,15 +74,17 @@ import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.services.FlowService;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardFlowService implements FlowService, ProtocolHandler {
 
     private static final String EVENT_CATEGORY = "Controller";
+    private static final String CLUSTER_NODE_CONFIG = "Cluster Node 
Configuration";
+
+    // state keys
+    private static final String NODE_UUID = "Node UUID";
 
     private final FlowController controller;
     private final Path flowXml;
@@ -169,8 +176,21 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             final InetSocketAddress nodeApiAddress = 
properties.getNodeApiAddress();
             final InetSocketAddress nodeSocketAddress = 
properties.getClusterNodeProtocolAddress();
 
+            String nodeUuid = null;
+            final StateManager stateManager = 
controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG);
+            if (stateManager != null) {
+                nodeUuid = stateManager.getState(Scope.LOCAL).get(NODE_UUID);
+            }
+
+            if (nodeUuid == null) {
+                nodeUuid = UUID.randomUUID().toString();
+            }
+
             // use a random UUID as the proposed node identifier
-            this.nodeId = new NodeIdentifier(UUID.randomUUID().toString(), 
nodeApiAddress.getHostName(), nodeApiAddress.getPort(), 
nodeSocketAddress.getHostName(), nodeSocketAddress.getPort());
+            this.nodeId = new NodeIdentifier(nodeUuid,
+                nodeApiAddress.getHostName(), nodeApiAddress.getPort(),
+                nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(),
+                properties.getRemoteInputHost(), 
properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
 
         } else {
             this.configuredForClustering = false;
@@ -179,6 +199,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
     }
 
+
     @Override
     public void saveFlowChanges() throws IOException {
         writeLock.lock();
@@ -507,7 +528,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
             // reconnect
             final ConnectionResponse connectionResponse = new 
ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(),
-                    request.getManagerRemoteSiteListeningPort(), 
request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
+                request.getManagerRemoteSiteListeningPort(), 
request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
             connectionResponse.setClusterManagerDN(request.getRequestorDN());
             loadFromConnectionResponse(connectionResponse);
 
@@ -616,8 +637,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         if (firstControllerInitialization) {
             logger.debug("First controller initialization. Loading reporting 
tasks and initializing controller.");
 
-            // load the controller tasks
-//            dao.loadReportingTasks(controller);
             // initialize the flow
             controller.initializeFlow();
 
@@ -650,8 +669,8 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
             for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
                 try {
                     response = 
senderListener.requestConnection(requestMsg).getConnectionResponse();
-                    if (response.isBlockedByFirewall()) {
-                        logger.warn("Connection request was blocked by cluster 
manager's firewall.");
+                    if (response.getRejectionReason() != null) {
+                        logger.warn("Connection request was blocked by cluster 
manager with the explanation: " + response.getRejectionReason());
                         // set response to null and treat a firewall blockage 
the same as getting no response from manager
                         response = null;
                         break;
@@ -667,7 +686,6 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                         // we received a successful connection response from 
manager
                         break;
                     }
-
                 } catch (final Exception pe) {
                     // could not create a socket and communicate with manager
                     logger.warn("Failed to connect to cluster due to: " + pe, 
pe);
@@ -691,6 +709,16 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 return null;
             } else {
                 // cluster manager provided a successful response with a 
current dataflow
+                // persist node uuid and index returned by NCM and return the 
response to the caller
+                try {
+                    // Ensure that we have registered our 'cluster node 
configuration' state key
+                    final Map<String, String> map = 
Collections.singletonMap(NODE_UUID, response.getNodeIdentifier().getId());
+                    
controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG).setState(map,
 Scope.LOCAL);
+                } catch (final IOException ioe) {
+                    logger.warn("Received successful response from Cluster 
Manager but failed to persist state about the Node's Unique Identifier and the 
Node's Index. "
+                        + "This node may be assigned a different UUID when the 
node is restarted.", ioe);
+                }
+
                 return response;
             }
         } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 0dd3b64..07dd58b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -171,7 +171,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                         controller.setMaxEventDrivenThreadCount(maxThreadCount 
/ 3);
                     }
 
-                    final Element reportingTasksElement = (Element) 
DomUtils.getChild(rootElement, "reportingTasks");
+                    final Element reportingTasksElement = 
DomUtils.getChild(rootElement, "reportingTasks");
                     final List<Element> taskElements;
                     if (reportingTasksElement == null) {
                         taskElements = Collections.emptyList();
@@ -179,7 +179,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                         taskElements = 
DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
                     }
 
-                    final Element controllerServicesElement = (Element) 
DomUtils.getChild(rootElement, "controllerServices");
+                    final Element controllerServicesElement = 
DomUtils.getChild(rootElement, "controllerServices");
                     final List<Element> controllerServiceElements;
                     if (controllerServicesElement == null) {
                         controllerServiceElements = Collections.emptyList();
@@ -252,7 +252,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                     // get the root group XML element
                     final Element rootGroupElement = (Element) 
rootElement.getElementsByTagName("rootGroup").item(0);
 
-                    final Element controllerServicesElement = (Element) 
DomUtils.getChild(rootElement, "controllerServices");
+                    final Element controllerServicesElement = 
DomUtils.getChild(rootElement, "controllerServices");
                     if (controllerServicesElement != null) {
                         final List<Element> serviceElements = 
DomUtils.getChildElementsByTagName(controllerServicesElement, 
"controllerService");
 
@@ -274,7 +274,7 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
                         updateProcessGroup(controller, /* parent group */ 
null, rootGroupElement, encryptor);
                     }
 
-                    final Element reportingTasksElement = (Element) 
DomUtils.getChild(rootElement, "reportingTasks");
+                    final Element reportingTasksElement = 
DomUtils.getChild(rootElement, "reportingTasks");
                     if (reportingTasksElement != null) {
                         final List<Element> taskElements = 
DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
                         for (final Element taskElement : taskElements) {
@@ -403,9 +403,9 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
         for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
             if (entry.getValue() == null) {
-                reportingTask.removeProperty(entry.getKey());
+                reportingTask.removeProperty(entry.getKey(), false);
             } else {
-                reportingTask.setProperty(entry.getKey(), entry.getValue());
+                reportingTask.setProperty(entry.getKey(), entry.getValue(), 
false);
             }
         }
 
@@ -735,9 +735,9 @@ public class StandardFlowSynchronizer implements 
FlowSynchronizer {
 
         for (final Map.Entry<String, String> entry : 
config.getProperties().entrySet()) {
             if (entry.getValue() == null) {
-                procNode.removeProperty(entry.getKey());
+                procNode.removeProperty(entry.getKey(), false);
             } else {
-                procNode.setProperty(entry.getKey(), entry.getValue());
+                procNode.setProperty(entry.getKey(), entry.getValue(), false);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index b537c30..09d1b40 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -125,15 +125,15 @@ public abstract class AbstractReportingTaskNode extends 
AbstractConfiguredCompon
     }
 
     @Override
-    public void setProperty(final String name, final String value) {
-        super.setProperty(name, value);
+    public void setProperty(final String name, final String value, final 
boolean triggerOnPropertyModified) {
+        super.setProperty(name, value, triggerOnPropertyModified);
 
         onConfigured();
     }
 
     @Override
-    public boolean removeProperty(String name) {
-        final boolean removed = super.removeProperty(name);
+    public boolean removeProperty(String name, final boolean 
triggerOnPropertyModified) {
+        final boolean removed = super.removeProperty(name, 
triggerOnPropertyModified);
         if (removed) {
             onConfigured();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index a4d337f..11d1b51 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -26,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
@@ -37,24 +38,27 @@ import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.reporting.Severity;
 
 public class StandardReportingContext implements ReportingContext, 
ControllerServiceLookup {
 
     private final FlowController flowController;
     private final EventAccess eventAccess;
+    private final ReportingTask reportingTask;
     private final BulletinRepository bulletinRepository;
     private final ControllerServiceProvider serviceProvider;
     private final Map<PropertyDescriptor, String> properties;
     private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
 
     public StandardReportingContext(final FlowController flowController, final 
BulletinRepository bulletinRepository,
-            final Map<PropertyDescriptor, String> properties, final 
ControllerServiceProvider serviceProvider) {
+            final Map<PropertyDescriptor, String> properties, final 
ControllerServiceProvider serviceProvider, final ReportingTask reportingTask) {
         this.flowController = flowController;
         this.eventAccess = flowController;
         this.bulletinRepository = bulletinRepository;
         this.properties = Collections.unmodifiableMap(properties);
         this.serviceProvider = serviceProvider;
+        this.reportingTask = reportingTask;
 
         preparedQueries = new HashMap<>();
         for (final Map.Entry<PropertyDescriptor, String> entry : 
properties.entrySet()) {
@@ -140,4 +144,8 @@ public class StandardReportingContext implements 
ReportingContext, ControllerSer
         return serviceProvider.getControllerServiceName(serviceIdentifier);
     }
 
+    @Override
+    public StateManager getStateManager() {
+        return 
flowController.getStateManagerProvider().getStateManager(reportingTask.getIdentifier());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
index fe3af92..1a40e8a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java
@@ -35,6 +35,6 @@ public class StandardReportingTaskNode extends 
AbstractReportingTaskNode impleme
 
     @Override
     public ReportingContext getReportingContext() {
-        return new StandardReportingContext(flowController, 
flowController.getBulletinRepository(), getProperties(), flowController);
+        return new StandardReportingContext(flowController, 
flowController.getBulletinRepository(), getProperties(), flowController, 
getReportingTask());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
index fd3f1cc..bac45af 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java
@@ -49,8 +49,8 @@ public class BatchingSessionFactory implements 
ProcessSessionFactory {
         return highThroughputSession;
     }
 
-    private class HighThroughputSession implements ProcessSession {
 
+    private class HighThroughputSession implements ProcessSession {
         private final StandardProcessSession session;
 
         public HighThroughputSession(final StandardProcessSession session) {
@@ -241,7 +241,5 @@ public class BatchingSessionFactory implements 
ProcessSessionFactory {
         public ProvenanceReporter getProvenanceReporter() {
             return session.getProvenanceReporter();
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index c68c78d..5e26c09 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ControllerService;
@@ -46,10 +47,12 @@ public class ConnectableProcessContext implements 
ProcessContext {
 
     private final Connectable connectable;
     private final StringEncryptor encryptor;
+    private final StateManager stateManager;
 
-    public ConnectableProcessContext(final Connectable connectable, final 
StringEncryptor encryptor) {
+    public ConnectableProcessContext(final Connectable connectable, final 
StringEncryptor encryptor, final StateManager stateManager) {
         this.connectable = connectable;
         this.encryptor = encryptor;
+        this.stateManager = stateManager;
     }
 
     @Override
@@ -235,4 +238,9 @@ public class ConnectableProcessContext implements 
ProcessContext {
     public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
         return false;
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index e5582ec..76d7c08 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.EventBasedWorker;
 import org.apache.nifi.controller.EventDrivenWorkerQueue;
@@ -54,7 +56,8 @@ public class EventDrivenSchedulingAgent implements 
SchedulingAgent {
     private static final Logger logger = 
LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
 
     private final FlowEngine flowEngine;
-    private final ControllerServiceProvider controllerServiceProvider;
+    private final ControllerServiceProvider serviceProvider;
+    private final StateManagerProvider stateManagerProvider;
     private final EventDrivenWorkerQueue workerQueue;
     private final ProcessContextFactory contextFactory;
     private final AtomicInteger maxThreadCount;
@@ -65,10 +68,11 @@ public class EventDrivenSchedulingAgent implements 
SchedulingAgent {
     private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = 
new ConcurrentHashMap<>();
     private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = 
new ConcurrentHashMap<>();
 
-    public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final 
ControllerServiceProvider flowController,
-            final EventDrivenWorkerQueue workerQueue, final 
ProcessContextFactory contextFactory, final int maxThreadCount, final 
StringEncryptor encryptor) {
+    public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final 
ControllerServiceProvider serviceProvider, final StateManagerProvider 
stateManagerProvider,
+        final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory 
contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
         this.flowEngine = flowEngine;
-        this.controllerServiceProvider = flowController;
+        this.serviceProvider = serviceProvider;
+        this.stateManagerProvider = stateManagerProvider;
         this.workerQueue = workerQueue;
         this.contextFactory = contextFactory;
         this.maxThreadCount = new AtomicInteger(maxThreadCount);
@@ -80,6 +84,10 @@ public class EventDrivenSchedulingAgent implements 
SchedulingAgent {
         }
     }
 
+    private StateManager getStateManager(final String componentId) {
+        return stateManagerProvider.getStateManager(componentId);
+    }
+
     @Override
     public void shutdown() {
         flowEngine.shutdown();
@@ -177,7 +185,8 @@ public class EventDrivenSchedulingAgent implements 
SchedulingAgent {
 
                 if (connectable instanceof ProcessorNode) {
                     final ProcessorNode procNode = (ProcessorNode) connectable;
-                    final StandardProcessContext standardProcessContext = new 
StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+                    final StandardProcessContext standardProcessContext = new 
StandardProcessContext(procNode, serviceProvider,
+                        encryptor, 
getStateManager(connectable.getIdentifier()));
 
                     final long runNanos = 
procNode.getRunDuration(TimeUnit.NANOSECONDS);
                     final ProcessSessionFactory sessionFactory;
@@ -251,7 +260,7 @@ public class EventDrivenSchedulingAgent implements 
SchedulingAgent {
                     }
                 } else {
                     final ProcessSessionFactory sessionFactory = new 
StandardProcessSessionFactory(context);
-                    final ConnectableProcessContext connectableProcessContext 
= new ConnectableProcessContext(connectable, encryptor);
+                    final ConnectableProcessContext connectableProcessContext 
= new ConnectableProcessContext(connectable, encryptor, 
getStateManager(connectable.getIdentifier()));
                     trigger(connectable, scheduleState, 
connectableProcessContext, sessionFactory);
 
                     // See explanation above for the ProcessorNode as to why 
we do this.

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
index 4278cee..3fa6b52 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.FlowController;
@@ -61,6 +62,10 @@ public class QuartzSchedulingAgent implements 
SchedulingAgent {
         this.encryptor = enryptor;
     }
 
+    private StateManager getStateManager(final String componentId) {
+        return 
flowController.getStateManagerProvider().getStateManager(componentId);
+    }
+
     @Override
     public void shutdown() {
     }
@@ -133,14 +138,15 @@ public class QuartzSchedulingAgent implements 
SchedulingAgent {
         final List<AtomicBoolean> triggers = new ArrayList<>();
         for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
             final Callable<Boolean> continuallyRunTask;
+
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) 
{
                 final ProcessorNode procNode = (ProcessorNode) connectable;
 
-                final StandardProcessContext standardProcContext = new 
StandardProcessContext(procNode, flowController, encryptor);
+                final StandardProcessContext standardProcContext = new 
StandardProcessContext(procNode, flowController, encryptor, 
getStateManager(connectable.getIdentifier()));
                 ContinuallyRunProcessorTask runnableTask = new 
ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, 
scheduleState, standardProcContext);
                 continuallyRunTask = runnableTask;
             } else {
-                final ConnectableProcessContext connProcContext = new 
ConnectableProcessContext(connectable, encryptor);
+                final ConnectableProcessContext connProcContext = new 
ConnectableProcessContext(connectable, encryptor, 
getStateManager(connectable.getIdentifier()));
                 continuallyRunTask = new 
ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, 
connProcContext);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
index ea0b456..e03cc05 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+
 public class ScheduleState {
 
     private final AtomicInteger activeThreadCount = new AtomicInteger(0);
@@ -62,12 +64,12 @@ public class ScheduleState {
     }
 
     /**
-     * Maintains an AtomicBoolean so that the first thread to call this method 
after a Processor is no longer scheduled to run will receive a 
<code>true</code> and MUST call the methods annotated with
-     *
-     * @OnStopped
+     * Maintains an AtomicBoolean so that the first thread to call this method 
after a Processor is no longer
+     * scheduled to run will receive a <code>true</code> and MUST call the 
methods annotated with
+     * {@link OnStopped @OnStopped}
      *
      * @return <code>true</code> if the caller is required to call Processor 
methods annotated with
-     * @OnStopped, <code>false</code> otherwise
+     *         @OnStopped, <code>false</code> otherwise
      */
     public boolean mustCallOnStoppedMethods() {
         return mustCallOnStoppedMethods.getAndSet(false);

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 82fc812..ee764e6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -32,6 +32,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -74,6 +76,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     private final Heartbeater heartbeater;
     private final long administrativeYieldMillis;
     private final String administrativeYieldDuration;
+    private final StateManagerProvider stateManagerProvider;
 
     private final ConcurrentMap<Object, ScheduleState> scheduleStates = new 
ConcurrentHashMap<>();
     private final ScheduledExecutorService frameworkTaskExecutor;
@@ -84,10 +87,12 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     private final StringEncryptor encryptor;
 
-    public StandardProcessScheduler(final Heartbeater heartbeater, final 
ControllerServiceProvider controllerServiceProvider, final StringEncryptor 
encryptor) {
+    public StandardProcessScheduler(final Heartbeater heartbeater, final 
ControllerServiceProvider controllerServiceProvider, final StringEncryptor 
encryptor,
+        final StateManagerProvider stateManagerProvider) {
         this.heartbeater = heartbeater;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
+        this.stateManagerProvider = stateManagerProvider;
 
         administrativeYieldDuration = 
NiFiProperties.getInstance().getAdministrativeYieldDuration();
         administrativeYieldMillis = 
FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
@@ -95,6 +100,10 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
     }
 
+    private StateManager getStateManager(final String componentId) {
+        return stateManagerProvider.getStateManager(componentId);
+    }
+
     public void scheduleFrameworkTask(final Runnable command, final String 
taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
         frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() {
             @Override
@@ -102,7 +111,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                 try {
                     command.run();
                 } catch (final Throwable t) {
-                    LOG.error("Failed to run Framework Task {} due to {}", 
command, t.toString());
+                    LOG.error("Failed to run Framework Task {} due to {}", 
taskName, t.toString());
                     if (LOG.isDebugEnabled()) {
                         LOG.error("", t);
                     }
@@ -111,6 +120,15 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         }, initialDelay, delay, timeUnit);
     }
 
+    /**
+     * Submits the given task to be executed exactly once in a background 
thread
+     * 
+     * @param task the task to perform
+     */
+    public void submitFrameworkTask(final Runnable task) {
+        frameworkTaskExecutor.submit(task);
+    }
+
     @Override
     public void setMaxThreadCount(final SchedulingStrategy schedulingStrategy, 
final int maxThreadCount) {
         final SchedulingAgent agent = getSchedulingAgent(schedulingStrategy);
@@ -299,7 +317,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             public void run() {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     final long lastStopTime = scheduleState.getLastStopTime();
-                    final StandardProcessContext processContext = new 
StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+                    final StandardProcessContext processContext = new 
StandardProcessContext(procNode, controllerServiceProvider, encryptor, 
getStateManager(procNode.getIdentifier()));
 
                     final Set<String> serviceIds = new HashSet<>();
                     for (final PropertyDescriptor descriptor : 
processContext.getProperties().keySet()) {
@@ -343,7 +361,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
                                     return;
                                 }
 
-                                final SchedulingContext schedulingContext = 
new StandardSchedulingContext(processContext, controllerServiceProvider, 
procNode);
+                                final SchedulingContext schedulingContext = 
new StandardSchedulingContext(processContext, controllerServiceProvider,
+                                    procNode, 
getStateManager(procNode.getIdentifier()));
                                 
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, 
org.apache.nifi.processor.annotation.OnScheduled.class, 
procNode.getProcessor(), schedulingContext);
 
                                 
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
@@ -420,7 +439,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             @Override
             public void run() {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                    final StandardProcessContext processContext = new 
StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+                    final StandardProcessContext processContext = new 
StandardProcessContext(procNode, controllerServiceProvider, encryptor, 
getStateManager(procNode.getIdentifier()));
 
                     
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, 
procNode.getProcessor(), processContext);
 
@@ -503,7 +522,7 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         getSchedulingAgent(connectable).unschedule(connectable, state);
 
         if (!state.isScheduled() && state.getActiveThreadCount() == 0 && 
state.mustCallOnStoppedMethods()) {
-            final ConnectableProcessContext processContext = new 
ConnectableProcessContext(connectable, encryptor);
+            final ConnectableProcessContext processContext = new 
ConnectableProcessContext(connectable, encryptor, 
getStateManager(connectable.getIdentifier()));
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
connectable, processContext);
                 heartbeater.heartbeat();

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
index 96cee20..04db549 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.controller.FlowController;
@@ -67,6 +68,10 @@ public class TimerDrivenSchedulingAgent implements 
SchedulingAgent {
         }
     }
 
+    private StateManager getStateManager(final String componentId) {
+        return 
flowController.getStateManagerProvider().getStateManager(componentId);
+    }
+
     @Override
     public void shutdown() {
         flowEngine.shutdown();
@@ -96,14 +101,14 @@ public class TimerDrivenSchedulingAgent implements 
SchedulingAgent {
             // Determine the task to run and create it.
             if (connectable.getConnectableType() == ConnectableType.PROCESSOR) 
{
                 final ProcessorNode procNode = (ProcessorNode) connectable;
-                final StandardProcessContext standardProcContext = new 
StandardProcessContext(procNode, flowController, encryptor);
+                final StandardProcessContext standardProcContext = new 
StandardProcessContext(procNode, flowController, encryptor, 
getStateManager(connectable.getIdentifier()));
                 final ContinuallyRunProcessorTask runnableTask = new 
ContinuallyRunProcessorTask(this, procNode, flowController,
                         contextFactory, scheduleState, standardProcContext);
 
                 continuallyRunTask = runnableTask;
                 processContext = standardProcContext;
             } else {
-                processContext = new ConnectableProcessContext(connectable, 
encryptor);
+                processContext = new ConnectableProcessContext(connectable, 
encryptor, getStateManager(connectable.getIdentifier()));
                 continuallyRunTask = new 
ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, 
processContext);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index b5c3855..f08d45b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -155,9 +155,9 @@ public class ControllerServiceLoader {
 
         for (final Map.Entry<String, String> entry : 
dto.getProperties().entrySet()) {
             if (entry.getValue() == null) {
-                node.removeProperty(entry.getKey());
+                node.removeProperty(entry.getKey(), false);
             } else {
-                node.setProperty(entry.getKey(), entry.getValue());
+                node.setProperty(entry.getKey(), entry.getValue(), false);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index 02d6263..482dabf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.service;
 
 import java.util.Set;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
@@ -28,11 +29,13 @@ public class StandardControllerServiceInitializationContext 
implements Controlle
     private final String id;
     private final ControllerServiceProvider serviceProvider;
     private final ComponentLog logger;
+    private final StateManager stateManager;
 
-    public StandardControllerServiceInitializationContext(final String 
identifier, final ComponentLog logger, final ControllerServiceProvider 
serviceProvider) {
+    public StandardControllerServiceInitializationContext(final String 
identifier, final ComponentLog logger, final ControllerServiceProvider 
serviceProvider, final StateManager stateManager) {
         this.id = identifier;
         this.logger = logger;
         this.serviceProvider = serviceProvider;
+        this.stateManager = stateManager;
     }
 
     @Override
@@ -79,4 +82,9 @@ public class StandardControllerServiceInitializationContext 
implements Controlle
     public ComponentLog getLogger() {
         return logger;
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index ba03ee3..9a4c1e6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -123,14 +123,14 @@ public class StandardControllerServiceNode extends 
AbstractConfiguredComponent i
     }
 
     @Override
-    public void setProperty(final String name, final String value) {
-        super.setProperty(name, value);
+    public void setProperty(final String name, final String value, final 
boolean triggerOnPropertyModified) {
+        super.setProperty(name, value, triggerOnPropertyModified);
         onConfigured();
     }
 
     @Override
-    public boolean removeProperty(String name) {
-        final boolean removed = super.removeProperty(name);
+    public boolean removeProperty(String name, final boolean 
triggerOnPropertyModified) {
+        final boolean removed = super.removeProperty(name, 
triggerOnPropertyModified);
         if (removed) {
             onConfigured();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 6561eb8..660f596 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -38,6 +38,8 @@ import java.util.concurrent.Executors;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
@@ -46,8 +48,8 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.ValidationContextFactory;
-import 
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import 
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
@@ -69,6 +71,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
     private final ConcurrentMap<String, ControllerServiceNode> 
controllerServices;
     private static final Set<Method> validDisabledMethods;
     private final BulletinRepository bulletinRepo;
+    private final StateManagerProvider stateManagerProvider;
 
     static {
         // methods that are okay to be called when the service is disabled.
@@ -82,12 +85,13 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         validDisabledMethods = Collections.unmodifiableSet(validMethods);
     }
 
-    public StandardControllerServiceProvider(final ProcessScheduler scheduler, 
final BulletinRepository bulletinRepo) {
+    public StandardControllerServiceProvider(final ProcessScheduler scheduler, 
final BulletinRepository bulletinRepo, final StateManagerProvider 
stateManagerProvider) {
         // the following 2 maps must be updated atomically, but we do not lock 
around them because they are modified
         // only in the createControllerService method, and both are modified 
before the method returns
         this.controllerServices = new ConcurrentHashMap<>();
         this.processScheduler = scheduler;
         this.bulletinRepo = bulletinRepo;
+        this.stateManagerProvider = stateManagerProvider;
     }
 
     private Class<?>[] getInterfaces(final Class<?> cls) {
@@ -110,6 +114,10 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         }
     }
 
+    private StateManager getStateManager(final String componentId) {
+        return stateManagerProvider.getStateManager(componentId);
+    }
+
     @Override
     public ControllerServiceNode createControllerService(final String type, 
final String id, final boolean firstTimeAdded) {
         if (type == null || id == null) {
@@ -171,7 +179,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
             logger.info("Created Controller Service of type {} with identifier 
{}", type, id);
 
             final ComponentLog serviceLogger = new SimpleProcessLogger(id, 
originalService);
-            originalService.initialize(new 
StandardControllerServiceInitializationContext(id, serviceLogger, this));
+            originalService.initialize(new 
StandardControllerServiceInitializationContext(id, serviceLogger, this, 
getStateManager(id)));
 
             final ValidationContextFactory validationContextFactory = new 
StandardValidationContextFactory(this);
 
@@ -489,6 +497,8 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         }
 
         controllerServices.remove(serviceNode.getIdentifier());
+
+        stateManagerProvider.onComponentRemoved(serviceNode.getIdentifier());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java
new file mode 100644
index 0000000..8227bbb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+import java.util.Set;
+
+public interface ClusterState {
+    /**
+     * @return <code>true</code> if this instance of NiFi is connected to a 
cluster, <code>false</code> if the node is disconnected
+     */
+    boolean isConnected();
+
+    /**
+     * @return the identifier that is used to identify this node in the cluster
+     */
+    String getNodeIdentifier();
+
+    /**
+     * @return a Set of {@link NodeDescription} objects that can be used to 
determine which other nodes are in the same cluster. This
+     *         Set will not be <code>null</code> but will be empty if the node 
is not connected to a cluster
+     */
+    Set<NodeDescription> getNodes();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java
new file mode 100644
index 0000000..ae91fcf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+public class ConfigParseException extends RuntimeException {
+    private static final long serialVersionUID = 4956533590170361572L;
+
+    public ConfigParseException(final String message) {
+        super(message);
+    }
+
+    public ConfigParseException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}

Reply via email to