http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java
new file mode 100644
index 0000000..0c284bf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+/**
+ * Provides information about a node in a NiFi cluster
+ */
+public interface NodeDescription {
+    /**
+     * @return the unique identifier for this node in the cluster
+     */
+    String getNodeIdentifier();
+
+    /**
+     * @return the hostname of the node
+     */
+    String getHostname();
+
+    /**
+     * @return the port on which the node's embedded ZooKeeper Server is 
running, or <code>null</code> if the node is
+     *         not running an embedded ZooKeeper server
+     */
+    Integer getEmbeddedZooKeeperPort();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
new file mode 100644
index 0000000..e83bbac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProvider;
+
+public class StandardStateManager implements StateManager {
+    private final StateProvider localProvider;
+    private final StateProvider clusterProvider;
+    private final String componentId;
+
+    public StandardStateManager(final StateProvider localProvider, final 
StateProvider clusterProvider, final String componentId) {
+        this.localProvider = localProvider;
+        this.clusterProvider = clusterProvider;
+        this.componentId = componentId;
+    }
+
+    private StateProvider getProvider(final Scope scope) {
+        if (scope == Scope.LOCAL || clusterProvider == null || 
!clusterProvider.isEnabled()) {
+            return localProvider;
+        }
+
+        return clusterProvider;
+    }
+
+
+    @Override
+    public StateMap getState(final Scope scope) throws IOException {
+        return getProvider(scope).getState(componentId);
+    }
+
+
+    @Override
+    public boolean replace(final StateMap oldValue, final Map<String, String> 
newValue, final Scope scope) throws IOException {
+        return getProvider(scope).replace(oldValue, newValue, componentId);
+    }
+
+    @Override
+    public void setState(final Map<String, String> state, final Scope scope) 
throws IOException {
+        getProvider(scope).setState(state, componentId);
+    }
+
+    @Override
+    public void clear(final Scope scope) throws IOException {
+        getProvider(scope).clear(componentId);
+    }
+
+    @Override
+    public String toString() {
+        return "StandardStateManager[componentId=" + componentId + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
new file mode 100644
index 0000000..b006ac6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.components.state.StateMap;
+
+public class StandardStateMap implements StateMap {
+    private final Map<String, String> stateValues;
+    private final long version;
+
+    public StandardStateMap(final Map<String, String> stateValues, final long 
version) {
+        this.stateValues = Collections.unmodifiableMap(stateValues == null ? 
Collections.<String, String> emptyMap() : stateValues);
+        this.version = version;
+    }
+
+    @Override
+    public long getVersion() {
+        return version;
+    }
+
+    @Override
+    public String get(final String key) {
+        return stateValues.get(key);
+    }
+
+    @Override
+    public Map<String, String> toMap() {
+        return stateValues;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
new file mode 100644
index 0000000..c9e7b8e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+
+public class StandardStateProviderInitializationContext implements 
StateProviderInitializationContext {
+    private final String id;
+    private final Map<PropertyDescriptor, PropertyValue> properties;
+    private final SSLContext sslContext;
+
+    public StandardStateProviderInitializationContext(final String identifier, 
final Map<PropertyDescriptor, PropertyValue> properties, final SSLContext 
sslContext) {
+        this.id = identifier;
+        this.properties = new HashMap<>(properties);
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public Map<PropertyDescriptor, PropertyValue> getProperties() {
+        return Collections.unmodifiableMap(properties);
+    }
+
+    @Override
+    public PropertyValue getProperty(final PropertyDescriptor property) {
+        return properties.get(property);
+    }
+
+    @Override
+    public String getIdentifier() {
+        return id;
+    }
+
+    @Override
+    public SSLContext getSSLContext() {
+        return sslContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java
new file mode 100644
index 0000000..5671f5a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.state.StateMap;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class StateMapSerDe implements SerDe<StateMapUpdate> {
+    private static final int VERSION = 0;
+
+    @Override
+    public void serializeEdit(final StateMapUpdate previousRecordState, final 
StateMapUpdate newRecordState, final DataOutputStream out) throws IOException {
+        serializeRecord(newRecordState, out);
+    }
+
+    @Override
+    public void serializeRecord(final StateMapUpdate record, final 
DataOutputStream out) throws IOException {
+        out.writeUTF(record.getComponentId());
+        out.writeUTF(record.getUpdateType().name());
+        if (record.getUpdateType() == UpdateType.DELETE) {
+            return;
+        }
+
+        final StateMap stateMap = record.getStateMap();
+        final long recordVersion = stateMap.getVersion();
+        out.writeLong(recordVersion);
+
+        final Map<String, String> map = stateMap.toMap();
+        out.writeInt(map.size());
+        for (final Map.Entry<String, String> entry : map.entrySet()) {
+            out.writeUTF(entry.getKey());
+            out.writeUTF(entry.getValue());
+        }
+    }
+
+    @Override
+    public StateMapUpdate deserializeEdit(final DataInputStream in, final 
Map<Object, StateMapUpdate> currentRecordStates, final int version) throws 
IOException {
+        return deserializeRecord(in, version);
+    }
+
+    @Override
+    public StateMapUpdate deserializeRecord(final DataInputStream in, final 
int version) throws IOException {
+        final String componentId = in.readUTF();
+        final String updateTypeName = in.readUTF();
+        final UpdateType updateType = UpdateType.valueOf(updateTypeName);
+        if (updateType == UpdateType.DELETE) {
+            return new StateMapUpdate(null, componentId, updateType);
+        }
+
+        final long recordVersion = in.readLong();
+        final int numEntries = in.readInt();
+        final Map<String, String> stateValues = new HashMap<>(numEntries);
+        for (int i = 0; i < numEntries; i++) {
+            final String key = in.readUTF();
+            final String value = in.readUTF();
+            stateValues.put(key, value);
+        }
+
+        return new StateMapUpdate(new StandardStateMap(stateValues, 
recordVersion), componentId, updateType);
+    }
+
+    @Override
+    public Object getRecordIdentifier(final StateMapUpdate record) {
+        return record.getComponentId();
+    }
+
+    @Override
+    public UpdateType getUpdateType(final StateMapUpdate record) {
+        return record.getUpdateType();
+    }
+
+    @Override
+    public String getLocation(final StateMapUpdate record) {
+        return null;
+    }
+
+    @Override
+    public int getVersion() {
+        return VERSION;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java
new file mode 100644
index 0000000..9eb478b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+import org.apache.nifi.components.state.StateMap;
+import org.wali.UpdateType;
+
+public class StateMapUpdate {
+    private final StateMap stateMap;
+    private final String componentId;
+    private final UpdateType updateType;
+
+    public StateMapUpdate(final StateMap stateMap, final String componentId, 
final UpdateType updateType) {
+        this.stateMap = stateMap;
+        this.componentId = componentId;
+        this.updateType = updateType;
+    }
+
+    public StateMap getStateMap() {
+        return stateMap;
+    }
+
+    public String getComponentId() {
+        return componentId;
+    }
+
+    public UpdateType getUpdateType() {
+        return updateType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java
new file mode 100644
index 0000000..f51d229
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state;
+
+public class StateProviderException extends RuntimeException {
+    private static final long serialVersionUID = -4701298038474540654L;
+
+    public StateProviderException(final String message) {
+        super(message);
+    }
+
+    public StateProviderException(final String message, final Throwable cause) 
{
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java
new file mode 100644
index 0000000..c8becee
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.config;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.nifi.controller.state.ConfigParseException;
+import org.apache.nifi.util.DomUtils;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+public class StateManagerConfiguration {
+    private final Map<String, StateProviderConfiguration> providers;
+
+    private StateManagerConfiguration(final Map<String, 
StateProviderConfiguration> providerConfigs) {
+        this.providers = providerConfigs;
+    }
+
+    public Map<String, StateProviderConfiguration> 
getStateProviderConfigurations() {
+        return Collections.unmodifiableMap(providers);
+    }
+
+    public StateProviderConfiguration getStateProviderConfiguration(final 
String providerId) {
+        return providers.get(providerId);
+    }
+
+    public List<StateProviderConfiguration> 
getStateProviderConfigurations(final StateProviderScope scope) {
+        final List<StateProviderConfiguration> configs = new ArrayList<>();
+        for (final StateProviderConfiguration config : providers.values()) {
+            if (config.getScope() == scope) {
+                configs.add(config);
+            }
+        }
+
+        return configs;
+    }
+
+    public static StateManagerConfiguration parse(final File configFile) 
throws IOException, ConfigParseException {
+        final DocumentBuilderFactory factory = 
DocumentBuilderFactory.newInstance();
+        factory.setNamespaceAware(false);
+
+        final Document document;
+        DocumentBuilder builder;
+        try {
+            builder = factory.newDocumentBuilder();
+            document = builder.parse(configFile);
+        } catch (ParserConfigurationException | SAXException e) {
+            throw new ConfigParseException("Unable to parse file " + 
configFile + ", as it does not appear to be a valid XML File", e);
+        }
+
+        final Element rootElement = document.getDocumentElement();
+        final List<Element> localProviderElements = 
DomUtils.getChildElementsByTagName(rootElement, "local-provider");
+        if (localProviderElements.isEmpty()) {
+            throw new ConfigParseException("State Management config file " + 
configFile + " is not a valid configuration file, "
+                + "as it does not contain a 'local-provider' element, or the 
local-provider element is not the child of the root element");
+        }
+
+        final Map<String, StateProviderConfiguration> configs = new 
HashMap<>();
+        for (final Element localProviderElement : localProviderElements) {
+            final StateProviderConfiguration providerConfig = 
parseProviderConfiguration(localProviderElement, StateProviderScope.LOCAL, 
configFile);
+            if (configs.containsKey(providerConfig.getId())) {
+                throw new ConfigParseException("State Management config file " 
+ configFile + " is not a valid configuration file, "
+                    + "as it contains multiple providers with the \"id\" of 
\"" + providerConfig.getId() + "\"");
+            }
+
+            configs.put(providerConfig.getId(), providerConfig);
+        }
+
+        final List<Element> clusterProviderElements = 
DomUtils.getChildElementsByTagName(rootElement, "cluster-provider");
+        for (final Element clusterProviderElement : clusterProviderElements) {
+            final StateProviderConfiguration providerConfig = 
parseProviderConfiguration(clusterProviderElement, StateProviderScope.CLUSTER, 
configFile);
+            if (configs.containsKey(providerConfig.getId())) {
+                throw new ConfigParseException("State Management config file " 
+ configFile + " is not a valid configuration file, "
+                    + "as it contains multiple providers with the \"id\" of 
\"" + providerConfig.getId() + "\"");
+            }
+
+            configs.put(providerConfig.getId(), providerConfig);
+        }
+
+        return new StateManagerConfiguration(configs);
+    }
+
+    private static StateProviderConfiguration parseProviderConfiguration(final 
Element providerElement, final StateProviderScope scope, final File configFile) 
throws ConfigParseException {
+        final String elementName = providerElement.getNodeName();
+
+        final String id = DomUtils.getChildText(providerElement, "id");
+        if (id == null) {
+            throw new ConfigParseException("State Management config file " + 
configFile + " is not a valid configuration file, "
+                + "as a " + elementName + " element does not contain an \"id\" 
element");
+        }
+        if (id.trim().isEmpty()) {
+            throw new ConfigParseException("State Management config file " + 
configFile + " is not a valid configuration file, "
+                + "as a " + elementName + "'s \"id\" element is empty");
+        }
+
+        final String className = DomUtils.getChildText(providerElement, 
"class");
+        if (className == null) {
+            throw new ConfigParseException("State Management config file " + 
configFile + " is not a valid configuration file, "
+                + "as a " + elementName + " element does not contain an 
\"class\" element");
+        }
+        if (className.trim().isEmpty()) {
+            throw new ConfigParseException("State Management config file " + 
configFile + " is not a valid configuration file, "
+                + "as a " + elementName + "'s \"class\" element is empty");
+        }
+
+        final List<Element> propertyElements = 
DomUtils.getChildElementsByTagName(providerElement, "property");
+        final Map<String, String> propertyMap = new HashMap<>();
+        for (final Element propertyElement : propertyElements) {
+            final String propertyName = propertyElement.getAttribute("name");
+            final String propertyValue = propertyElement.getTextContent();
+            propertyMap.put(propertyName, propertyValue);
+        }
+
+        return new StateProviderConfiguration(id, className, scope, 
propertyMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java
new file mode 100644
index 0000000..290a750
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StateProviderConfiguration {
+    private final String id;
+    private final StateProviderScope scope;
+    private final String className;
+    private final Map<String, String> properties;
+
+    public StateProviderConfiguration(final String id, final String className, 
final StateProviderScope scope, final Map<String, String> properties) {
+        this.id = id;
+        this.className = className;
+        this.scope = scope;
+        this.properties = new HashMap<>(properties);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public StateProviderScope getScope() {
+        return scope;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java
new file mode 100644
index 0000000..40e1865
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.config;
+
+public enum StateProviderScope {
+    /**
+     * Provider is a Local State Provider
+     */
+    LOCAL,
+
+    /**
+     * Provider is a Cluster State Provider
+     */
+    CLUSTER;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
new file mode 100644
index 0000000..f887527
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.manager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.controller.state.ConfigParseException;
+import org.apache.nifi.controller.state.StandardStateManager;
+import 
org.apache.nifi.controller.state.StandardStateProviderInitializationContext;
+import org.apache.nifi.controller.state.config.StateManagerConfiguration;
+import org.apache.nifi.controller.state.config.StateProviderConfiguration;
+import org.apache.nifi.controller.state.config.StateProviderScope;
+import org.apache.nifi.framework.security.util.SslContextFactory;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.StandardValidationContext;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardStateManagerProvider implements StateManagerProvider {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardStateManagerProvider.class);
+
+    private final ConcurrentMap<String, StateManager> stateManagers = new 
ConcurrentHashMap<>();
+    private final StateProvider localStateProvider;
+    private final StateProvider clusterStateProvider;
+
+    private StandardStateManagerProvider(final StateProvider 
localStateProvider, final StateProvider clusterStateProvider) {
+        this.localStateProvider = localStateProvider;
+        this.clusterStateProvider = clusterStateProvider;
+    }
+
+    public static StateManagerProvider create(final NiFiProperties properties) 
throws ConfigParseException, IOException {
+        final StateProvider localProvider = 
createLocalStateProvider(properties);
+
+        final StateProvider clusterProvider;
+        if (properties.isNode()) {
+            clusterProvider = createClusteredStateProvider(properties);
+        } else {
+            clusterProvider = null;
+        }
+
+        return new StandardStateManagerProvider(localProvider, 
clusterProvider);
+    }
+
+    private static StateProvider createLocalStateProvider(final NiFiProperties 
properties) throws IOException, ConfigParseException {
+        final File configFile = properties.getStateManagementConfigFile();
+        return createStateProvider(configFile, StateProviderScope.LOCAL, 
properties);
+    }
+
+
+    private static StateProvider createClusteredStateProvider(final 
NiFiProperties properties) throws IOException, ConfigParseException {
+        final File configFile = properties.getStateManagementConfigFile();
+        return createStateProvider(configFile, StateProviderScope.CLUSTER, 
properties);
+    }
+
+
+    private static StateProvider createStateProvider(final File configFile, 
final StateProviderScope scope, final NiFiProperties properties) throws 
ConfigParseException, IOException {
+        final String providerId;
+        final String providerIdPropertyName;
+        final String providerDescription;
+        final String providerXmlElementName;
+        final String oppositeScopeXmlElementName;
+
+        switch (scope) {
+            case CLUSTER:
+                providerId = properties.getClusterStateProviderId();
+                providerIdPropertyName = 
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_ID;
+                providerDescription = "Cluster State Provider";
+                providerXmlElementName = "cluster-provider";
+                oppositeScopeXmlElementName = "local-provider";
+                break;
+            case LOCAL:
+                providerId = properties.getLocalStateProviderId();
+                providerIdPropertyName = 
NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID;
+                providerDescription = "Local State Provider";
+                providerXmlElementName = "local-provider";
+                oppositeScopeXmlElementName = "cluster-provider";
+                break;
+            default:
+                throw new AssertionError("Attempted to create State Provider 
for unknown Scope: " + scope);
+        }
+
+        if (!configFile.exists()) {
+            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the State Management Configuration File " + 
configFile + " does not exist");
+        }
+        if (!configFile.canRead()) {
+            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the State Management Configuration File " + 
configFile + " cannot be read");
+        }
+
+        if (providerId == null) {
+            if (scope == StateProviderScope.CLUSTER) {
+                throw new IllegalStateException("Cannot create Cluster State 
Provider because the '" + providerIdPropertyName
+                    + "' property is missing from the NiFi Properties file. In 
order to run NiFi in a cluster, the " + providerIdPropertyName
+                    + " property must be configured in nifi.properties");
+            }
+
+            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
+                + "' property is missing from the NiFi Properties file");
+        }
+
+        if (providerId.trim().isEmpty()) {
+            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
+                + "' property in the NiFi Properties file has no value set. 
This is a required property and must reference the identifier of one of the "
+                + providerXmlElementName + " elements in the State Management 
Configuraiton File (" + configFile + ")");
+        }
+
+        final StateManagerConfiguration config = 
StateManagerConfiguration.parse(configFile);
+        final StateProviderConfiguration providerConfig = 
config.getStateProviderConfiguration(providerId);
+        if (providerConfig == null) {
+            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
+                + "' property in the NiFi Properties file is set to '" + 
providerId + "', but there is no " + providerXmlElementName
+                + " entry in the State Management Configuration File (" + 
configFile + ") with this id");
+        }
+
+        if (providerConfig.getScope() != scope) {
+            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
+                + "' property in the NiFi Properties file is set to '" + 
providerId + "', but this id is assigned to a " + oppositeScopeXmlElementName
+                + " entry in the State Management Configuration File (" + 
configFile + "), rather than a " + providerXmlElementName + " entry");
+        }
+
+        final String providerClassName = providerConfig.getClassName();
+
+        final StateProvider provider;
+        try {
+            provider = instantiateStateProvider(providerClassName);
+        } catch (final Exception e) {
+            throw new RuntimeException("Cannot create " + providerDescription 
+ " of type " + providerClassName, e);
+        }
+
+        final Map<PropertyDescriptor, PropertyValue> propertyMap = new 
HashMap<>();
+        final Map<PropertyDescriptor, String> propertyStringMap = new 
HashMap<>();
+        for (final PropertyDescriptor descriptor : 
provider.getPropertyDescriptors()) {
+            propertyMap.put(descriptor, new 
StandardPropertyValue(descriptor.getDefaultValue(), null));
+            propertyStringMap.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        for (final Map.Entry<String, String> entry : 
providerConfig.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = 
provider.getPropertyDescriptor(entry.getKey());
+            propertyStringMap.put(descriptor, entry.getValue());
+            propertyMap.put(descriptor, new 
StandardPropertyValue(entry.getValue(), null));
+        }
+
+        final SSLContext sslContext = 
SslContextFactory.createSslContext(properties, false);
+        final StateProviderInitializationContext initContext = new 
StandardStateProviderInitializationContext(providerId, propertyMap, sslContext);
+
+        synchronized (provider) {
+            provider.initialize(initContext);
+        }
+
+        final ValidationContext validationContext = new 
StandardValidationContext(null, propertyStringMap, null);
+        final Collection<ValidationResult> results = 
provider.validate(validationContext);
+        final StringBuilder validationFailures = new StringBuilder();
+
+        int invalidCount = 0;
+        for (final ValidationResult result : results) {
+            if (!result.isValid()) {
+                validationFailures.append(result.toString()).append("\n");
+                invalidCount++;
+            }
+        }
+
+        if (invalidCount > 0) {
+            throw new IllegalStateException("Could not initialize State 
Providers because the " + providerDescription + " is not valid. The following "
+                + invalidCount + " Validation Errors occurred:\n" + 
validationFailures.toString() + "\nPlease check the configuration of the " + 
providerDescription + " with ID ["
+                + providerId.trim() + "] in the file " + 
configFile.getAbsolutePath());
+        }
+
+        return provider;
+    }
+
+    private static StateProvider instantiateStateProvider(final String type) 
throws ClassNotFoundException, InstantiationException, IllegalAccessException {
+        final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            final ClassLoader detectedClassLoaderForType = 
ExtensionManager.getClassLoader(type);
+            final Class<?> rawClass;
+            if (detectedClassLoaderForType == null) {
+                // try to find from the current class loader
+                rawClass = Class.forName(type);
+            } else {
+                // try to find from the registered classloader for that type
+                rawClass = Class.forName(type, true, 
ExtensionManager.getClassLoader(type));
+            }
+
+            
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+            final Class<? extends StateProvider> mgrClass = 
rawClass.asSubclass(StateProvider.class);
+            return mgrClass.newInstance();
+        } finally {
+            if (ctxClassLoader != null) {
+                Thread.currentThread().setContextClassLoader(ctxClassLoader);
+            }
+        }
+    }
+
+    /**
+     * Returns the State Manager that has been created for the given component 
ID, or <code>null</code> if none exists
+     *
+     * @return the StateManager that can be used by the component with the 
given ID, or <code>null</code> if none exists
+     */
+    @Override
+    public synchronized StateManager getStateManager(final String componentId) 
{
+        StateManager stateManager = stateManagers.get(componentId);
+        if (stateManager != null) {
+            return stateManager;
+        }
+
+        stateManager = new StandardStateManager(localStateProvider, 
clusterStateProvider, componentId);
+        stateManagers.put(componentId, stateManager);
+        return stateManager;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        localStateProvider.shutdown();
+        if (clusterStateProvider != null) {
+            clusterStateProvider.shutdown();
+        }
+    }
+
+    @Override
+    public void enableClusterProvider() {
+        clusterStateProvider.enable();
+    }
+
+    @Override
+    public void disableClusterProvider() {
+        clusterStateProvider.disable();
+    }
+
+    @Override
+    public void onComponentRemoved(final String componentId) {
+        final StateManager mgr = stateManagers.remove(componentId);
+        if (mgr == null) {
+            return;
+        }
+
+        try {
+            mgr.clear(Scope.CLUSTER);
+        } catch (final Exception e) {
+            logger.warn("Component with ID {} was removed from NiFi instance 
but failed to clear clustered state for the component", e);
+        }
+
+        try {
+            mgr.clear(Scope.LOCAL);
+        } catch (final Exception e) {
+            logger.warn("Component with ID {} was removed from NiFi instance 
but failed to clear local state for the component", e);
+        }
+
+        try {
+            localStateProvider.onComponentRemoved(componentId);
+        } catch (final Exception e) {
+            logger.warn("Component with ID {} was removed from NiFi instance 
but failed to cleanup resources used to maintain its local state", e);
+        }
+
+        if (clusterStateProvider != null) {
+            try {
+                clusterStateProvider.onComponentRemoved(componentId);
+            } catch (final Exception e) {
+                logger.warn("Component with ID {} was removed from NiFi 
instance but failed to cleanup resources used to maintain its clustered state", 
e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java
new file mode 100644
index 0000000..078dce3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.providers;
+
+import java.io.IOException;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+
+public abstract class AbstractStateProvider extends 
AbstractConfigurableComponent implements StateProvider {
+    private String identifier;
+
+    private volatile boolean enabled;
+
+    @Override
+    public final void initialize(final StateProviderInitializationContext 
context) throws IOException {
+        this.identifier = context.getIdentifier();
+        init(context);
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public void enable() {
+        enabled = true;
+    }
+
+    @Override
+    public void disable() {
+        enabled = false;
+    }
+
+    @Override
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public abstract void init(final StateProviderInitializationContext 
context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
new file mode 100644
index 0000000..c23e517
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.providers.local;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.controller.state.StandardStateMap;
+import org.apache.nifi.controller.state.StateMapSerDe;
+import org.apache.nifi.controller.state.StateMapUpdate;
+import org.apache.nifi.controller.state.providers.AbstractStateProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.UpdateType;
+import org.wali.WriteAheadRepository;
+
+/**
+ * Provides state management for local (node-only) state, backed by a 
write-ahead log
+ */
+public class WriteAheadLocalStateProvider extends AbstractStateProvider {
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
+
+    // TODO: CREATE BACKGROUND THREAD OR USE EXECUTOR (in 
StateProviderInitializationContext?) to schedule checkpointing.
+    private static final long CHECKPOINT_NANOS = TimeUnit.MINUTES.toNanos(2);
+
+    private final StateMapSerDe serde;
+    private final ConcurrentMap<String, ComponentProvider> componentProviders 
= new ConcurrentHashMap<>();
+
+    static final PropertyDescriptor PATH = new PropertyDescriptor.Builder()
+        .name("Directory")
+        .description("The directory where the Provider should store its data")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("./state")
+        .required(true)
+        .build();
+
+    private WriteAheadRepository<StateMapUpdate> writeAheadLog;
+    private AtomicLong versionGenerator;
+
+    public WriteAheadLocalStateProvider() {
+        serde = new StateMapSerDe();
+    }
+
+    @Override
+    public synchronized void init(final StateProviderInitializationContext 
context) throws IOException {
+        final File basePath = new File(context.getProperty(PATH).getValue());
+
+        if (!basePath.exists() && !basePath.mkdirs()) {
+            throw new RuntimeException("Cannot Initialize Local State Provider 
because the 'Directory' property is set to \"" + basePath + "\", but that 
directory could not be created");
+        }
+
+        if (!basePath.isDirectory()) {
+            throw new RuntimeException("Cannot Initialize Local State Provider 
because the 'Directory' property is set to \"" + basePath + "\", but that is a 
file, rather than a directory");
+        }
+
+        if (!basePath.canWrite()) {
+            throw new RuntimeException("Cannot Initialize Local State Provider 
because the 'Directory' property is set to \"" + basePath + "\", but that 
directory cannot be written to");
+        }
+
+        if (!basePath.canRead()) {
+            throw new RuntimeException("Cannot Initialize Local State Provider 
because the 'Directory' property is set to \"" + basePath + "\", but that 
directory cannot be read");
+        }
+
+        versionGenerator = new AtomicLong(-1L);
+        writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 
16, serde, null);
+
+        final Collection<StateMapUpdate> updates = 
writeAheadLog.recoverRecords();
+        long maxRecordVersion = -1L;
+
+        for (final StateMapUpdate update : updates) {
+            if (update.getUpdateType() == UpdateType.DELETE) {
+                continue;
+            }
+
+            final long recordVersion = update.getStateMap().getVersion();
+            if (recordVersion > maxRecordVersion) {
+                maxRecordVersion = recordVersion;
+            }
+
+            final String componentId = update.getComponentId();
+            componentProviders.put(componentId, new 
ComponentProvider(writeAheadLog, versionGenerator, componentId, 
update.getStateMap()));
+        }
+
+        // keep a separate maxRecordVersion and set it at the end so that we 
don't have to continually update an AtomicLong, which is more
+        // expensive than just keeping track of a local 'long' variable. Since 
we won't actually increment this at any point until this after
+        // the init() method completes, this is okay to do.
+        versionGenerator.set(maxRecordVersion);
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(PATH);
+        return properties;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        try {
+            writeAheadLog.shutdown();
+        } catch (final IOException ioe) {
+            logger.warn("Failed to shut down {} successfully due to {}", this, 
ioe.toString());
+            logger.warn("", ioe);
+        }
+    }
+
+    private ComponentProvider getProvider(final String componentId) {
+        ComponentProvider componentProvider = 
componentProviders.get(componentId);
+        if (componentProvider == null) {
+            final StateMap stateMap = new 
StandardStateMap(Collections.<String, String> emptyMap(), -1L);
+            componentProvider = new ComponentProvider(writeAheadLog, 
versionGenerator, componentId, stateMap);
+
+            final ComponentProvider existingComponentProvider = 
componentProviders.putIfAbsent(componentId, componentProvider);
+            if (existingComponentProvider != null) {
+                componentProvider = existingComponentProvider;
+            }
+        }
+
+        return componentProvider;
+    }
+
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        return getProvider(componentId).getState();
+    }
+
+    @Override
+    public void setState(final Map<String, String> state, final String 
componentId) throws IOException {
+        getProvider(componentId).setState(state);
+    }
+
+    @Override
+    public boolean replace(final StateMap oldValue, final Map<String, String> 
newValue, final String componentId) throws IOException {
+        return getProvider(componentId).replace(oldValue, newValue);
+    }
+
+    @Override
+    public void clear(final String componentId) throws IOException {
+        getProvider(componentId).clear();
+    }
+
+    @Override
+    public void onComponentRemoved(final String componentId) throws 
IOException {
+        clear(componentId);
+        componentProviders.remove(componentId);
+    }
+
+    private static class ComponentProvider {
+        private final AtomicLong versionGenerator;
+        private final WriteAheadRepository<StateMapUpdate> wal;
+        private final String componentId;
+
+        private StateMap stateMap;
+
+        public ComponentProvider(final WriteAheadRepository<StateMapUpdate> 
wal, final AtomicLong versionGenerator, final String componentId, final 
StateMap stateMap) {
+            this.wal = wal;
+            this.versionGenerator = versionGenerator;
+            this.componentId = componentId;
+            this.stateMap = stateMap;
+        }
+
+        public synchronized StateMap getState() throws IOException {
+            return stateMap;
+        }
+
+        // synchronized because we need to ensure that update of state in WAL 
and updating of local stateMap variable is atomic.
+        // Additionally, the implementation of WriteAheadRepository that we 
are using requires that only a single thread update the
+        // repository at a time for a record with the same key. I.e., many 
threads can update the repository at once, as long as they
+        // are not updating the repository with records that have the same 
identifier.
+        public synchronized void setState(final Map<String, String> state) 
throws IOException {
+            stateMap = new StandardStateMap(state, 
versionGenerator.incrementAndGet());
+            final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, 
componentId, UpdateType.UPDATE);
+            wal.update(Collections.singleton(updateRecord), false);
+        }
+
+        // see above explanation as to why this method is synchronized.
+        public synchronized boolean replace(final StateMap oldValue, final 
Map<String, String> newValue) throws IOException {
+            if (stateMap != oldValue) {
+                return false;
+            }
+
+            stateMap = new StandardStateMap(new HashMap<>(newValue), 
versionGenerator.incrementAndGet());
+            final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, 
componentId, UpdateType.UPDATE);
+            wal.update(Collections.singleton(updateRecord), false);
+            return true;
+        }
+
+        public synchronized void clear() throws IOException {
+            stateMap = new StandardStateMap(null, 
versionGenerator.incrementAndGet());
+            final StateMapUpdate update = new StateMapUpdate(stateMap, 
componentId, UpdateType.UPDATE);
+            wal.update(Collections.singleton(update), false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
new file mode 100644
index 0000000..f865c8b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.providers.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.controller.state.StandardStateMap;
+import org.apache.nifi.controller.state.providers.AbstractStateProvider;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooKeeperStateProvider extends AbstractStateProvider {
+    static final PropertyDescriptor CONNECTION_STRING = new 
PropertyDescriptor.Builder()
+        .name("Connect String")
+        .description("The ZooKeeper Connect String to use. This is a 
comma-separated list of hostnames/IP addresses, such as \"host1, host2, 
127.0.0.1, host4, host5\"")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .required(false)
+        .build();
+    static final PropertyDescriptor SESSION_TIMEOUT = new 
PropertyDescriptor.Builder()
+        .name("Session Timeout")
+        .description("Specifies how long this instance of NiFi is allowed to 
be disconnected from ZooKeeper before creating a new ZooKeeper Session")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .defaultValue("30 sec")
+        .required(true)
+        .build();
+    static final PropertyDescriptor ROOT_NODE = new 
PropertyDescriptor.Builder()
+        .name("Root Node")
+        .description("The Root Node to use in ZooKeeper to store state in")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .defaultValue("/nifi")
+        .required(true)
+        .build();
+
+
+    private final List<ACL> acl;
+
+    private ZooKeeper zooKeeper;
+    private int timeoutMillis;
+    private String rootNode;
+    private String connectionString;
+
+    private static final int ENCODING_VERSION = 1;
+
+    public ZooKeeperStateProvider() throws Exception {
+        // TODO: Provide SSL Context
+        // TODO: Use more appropriate acl
+        acl = Ids.OPEN_ACL_UNSAFE;
+    }
+
+
+    @Override
+    public synchronized void init(final StateProviderInitializationContext 
context) {
+        connectionString = context.getProperty(CONNECTION_STRING).getValue();
+
+        rootNode = context.getProperty(ROOT_NODE).getValue();
+        timeoutMillis = 
context.getProperty(SESSION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (zooKeeper != null) {
+            try {
+                zooKeeper.close();
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        zooKeeper = null;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CONNECTION_STRING);
+        properties.add(SESSION_TIMEOUT);
+        properties.add(ROOT_NODE);
+        return properties;
+    }
+
+    private synchronized ZooKeeper getZooKeeper() throws IOException {
+        if (zooKeeper != null && !zooKeeper.getState().isAlive()) {
+            invalidateClient();
+        }
+
+        if (zooKeeper == null) {
+            zooKeeper = new ZooKeeper(connectionString, timeoutMillis, new 
Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                }
+            });
+        }
+
+        return zooKeeper;
+    }
+
+    private synchronized void invalidateClient() {
+        shutdown();
+    }
+
+    private String getComponentPath(final String componentId) {
+        return rootNode + "/components/" + componentId;
+    }
+
+    private void verifyEnabled() throws IOException {
+        if (!isEnabled()) {
+            throw new IOException("Cannot update or retrieve cluster state 
becuase node is no longer connected to a cluster");
+        }
+    }
+
+    @Override
+    public void onComponentRemoved(final String componentId) throws 
IOException {
+        try {
+            ZKUtil.deleteRecursive(getZooKeeper(), 
getComponentPath(componentId));
+        } catch (final KeeperException ke) {
+            // Node doesn't exist so just ignore
+            if (Code.NONODE == ke.code()) {
+                return;
+            }
+            if (Code.SESSIONEXPIRED == ke.code()) {
+                invalidateClient();
+                onComponentRemoved(componentId);
+            }
+
+            throw new IOException("Unable to remove state for component with 
ID '" + componentId + "' from ZooKeeper", ke);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Failed to remove state for component with 
ID '" + componentId + "' from ZooKeeper due to being interrupted", e);
+        }
+    }
+
+    @Override
+    public void setState(final Map<String, String> state, final String 
componentId) throws IOException {
+        setState(state, -1, componentId);
+    }
+
+
+    private byte[] serialize(final Map<String, String> stateValues) throws 
IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            final DataOutputStream dos = new DataOutputStream(baos)) {
+            dos.writeInt(ENCODING_VERSION);
+            dos.writeInt(stateValues.size());
+            for (final Map.Entry<String, String> entry : 
stateValues.entrySet()) {
+                dos.writeUTF(entry.getKey());
+                dos.writeUTF(entry.getValue());
+            }
+            return baos.toByteArray();
+        }
+    }
+
+    private StateMap deserialize(final byte[] data, final int recordVersion, 
final String componentId) throws IOException {
+        try (final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            final DataInputStream dis = new DataInputStream(bais)) {
+
+            final int encodingVersion = dis.readInt();
+            if (encodingVersion > ENCODING_VERSION) {
+                throw new IOException("Retrieved a response from ZooKeeper 
when retrieving state for component with ID " + componentId
+                    + ", but the response was encoded using the 
ZooKeeperStateProvider Encoding Version of " + encodingVersion
+                    + " but this instance can only decode versions up to " + 
ENCODING_VERSION
+                    + "; it appears that the state was encoded using a newer 
version of NiFi than is currently running. This information cannot be 
decoded.");
+            }
+
+            final int numEntries = dis.readInt();
+            final Map<String, String> stateValues = new HashMap<>(numEntries);
+            for (int i = 0; i < numEntries; i++) {
+                final String key = dis.readUTF();
+                final String value = dis.readUTF();
+                stateValues.put(key, value);
+            }
+
+            return new StandardStateMap(stateValues, recordVersion);
+        }
+    }
+
+    private void setState(final Map<String, String> stateValues, final int 
version, final String componentId) throws IOException {
+        verifyEnabled();
+
+        try {
+            final String path = getComponentPath(componentId);
+            final byte[] data = serialize(stateValues);
+
+            final ZooKeeper keeper = getZooKeeper();
+            try {
+                keeper.setData(path, data, version);
+            } catch (final KeeperException ke) {
+                if (ke.code() == Code.NONODE) {
+                    createNode(path, data);
+                } else {
+                    throw ke;
+                }
+            }
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId + " due to interruption", e);
+        } catch (final KeeperException ke) {
+            if (Code.SESSIONEXPIRED == ke.code()) {
+                invalidateClient();
+                setState(stateValues, version, componentId);
+            }
+
+            throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId, ke);
+        } catch (final IOException ioe) {
+            throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId, ioe);
+        }
+    }
+
+
+    private void createNode(final String path, final byte[] data) throws 
IOException, KeeperException {
+        try {
+            getZooKeeper().create(path, data, acl, CreateMode.PERSISTENT);
+        } catch (final InterruptedException ie) {
+            throw new IOException("Failed to update cluster-wide state due to 
interruption", ie);
+        } catch (final KeeperException ke) {
+            if (ke.code() == Code.NONODE) {
+                final String parentPath = 
StringUtils.substringBeforeLast(path, "/");
+                createNode(parentPath, null);
+                createNode(path, data);
+                return;
+            }
+            if (Code.SESSIONEXPIRED == ke.code()) {
+                invalidateClient();
+                createNode(path, data);
+            }
+
+            // Node already exists. Node must have been created by "someone 
else". Just set the data.
+            if (ke.code() == Code.NODEEXISTS) {
+                try {
+                    getZooKeeper().setData(path, data, -1);
+                } catch (final KeeperException ke1) {
+                    // Node no longer exists -- it was removed by someone 
else. Go recreate the node.
+                    if (ke1.code() == Code.NONODE) {
+                        createNode(path, data);
+                    }
+                } catch (final InterruptedException ie) {
+                    throw new IOException("Failed to update cluster-wide state 
due to interruption", ie);
+                }
+            }
+
+
+            throw ke;
+        }
+    }
+
+    @Override
+    public StateMap getState(final String componentId) throws IOException {
+        verifyEnabled();
+
+        try {
+            final Stat stat = new Stat();
+            final String path = getComponentPath(componentId);
+            final byte[] data = getZooKeeper().getData(path, false, stat);
+
+            final StateMap stateMap = deserialize(data, stat.getVersion(), 
componentId);
+            return stateMap;
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Failed to obtain value from ZooKeeper for 
component with ID " + componentId + ", due to interruption", e);
+        } catch (final KeeperException ke) {
+            if (ke.code() == Code.NONODE) {
+                return new StandardStateMap(null, -1L);
+            }
+            if (Code.SESSIONEXPIRED == ke.code()) {
+                invalidateClient();
+                return getState(componentId);
+            }
+
+            throw new IOException("Failed to obtain value from ZooKeeper for 
component with ID " + componentId, ke);
+        } catch (final IOException ioe) {
+            // provide more context in the error message
+            throw new IOException("Failed to obtain value from ZooKeeper for 
component with ID " + componentId, ioe);
+        }
+    }
+
+    @Override
+    public boolean replace(final StateMap oldValue, final Map<String, String> 
newValue, final String componentId) throws IOException {
+        verifyEnabled();
+
+        try {
+            setState(newValue, (int) oldValue.getVersion(), componentId);
+            return true;
+        } catch (final IOException ioe) {
+            return false;
+        }
+    }
+
+
+    @Override
+    public void clear(final String componentId) throws IOException {
+        verifyEnabled();
+        setState(Collections.<String, String> emptyMap(), componentId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
new file mode 100644
index 0000000..8af9e5a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.state.server;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperStateServer extends ZooKeeperServerMain {
+    private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperStateServer.class);
+
+    private QuorumPeerConfig quorumPeerConfig;
+
+    private ZooKeeperStateServer(final Properties zkProperties) throws 
IOException, ConfigException {
+        quorumPeerConfig = new QuorumPeerConfig();
+        quorumPeerConfig.parseProperties(zkProperties);
+    }
+
+    public synchronized void start() throws IOException {
+        logger.info("Starting Embedded ZooKeeper Server");
+
+        final ServerConfig serverConfig = new ServerConfig();
+        serverConfig.readFrom(quorumPeerConfig);
+        runFromConfig(serverConfig);
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        super.shutdown();
+    }
+
+    public static ZooKeeperStateServer create(final NiFiProperties properties) 
throws IOException, ConfigException {
+        final File propsFile = properties.getEmbeddedZooKeeperPropertiesFile();
+        if (propsFile == null) {
+            return null;
+        }
+
+        if (!propsFile.exists() || !propsFile.canRead()) {
+            throw new IOException("Cannot create Embedded ZooKeeper Server 
because the Properties File " + propsFile.getAbsolutePath()
+                + " referenced in nifi.properties does not exist or cannot be 
read");
+        }
+
+        final Properties zkProperties = new Properties();
+        try (final InputStream fis = new FileInputStream(propsFile);
+            final InputStream bis = new BufferedInputStream(fis)) {
+            zkProperties.load(bis);
+        }
+
+        return new ZooKeeperStateServer(zkProperties);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 88fa43f..17f9149 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
@@ -44,6 +45,7 @@ import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.LocalPort;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
@@ -73,6 +75,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
     private final ProcessScheduler scheduler;
     private final ControllerServiceProvider controllerServiceProvider;
+    private final FlowController flowController;
 
     private final Map<String, Port> inputPorts = new HashMap<>();
     private final Map<String, Port> outputPorts = new HashMap<>();
@@ -90,13 +93,16 @@ public final class StandardProcessGroup implements 
ProcessGroup {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StandardProcessGroup.class);
 
-    public StandardProcessGroup(final String id, final 
ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, 
final NiFiProperties nifiProps, final StringEncryptor encryptor) {
+    public StandardProcessGroup(final String id, final 
ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, 
final NiFiProperties nifiProps, final StringEncryptor encryptor,
+        final FlowController flowController) {
         this.id = id;
         this.controllerServiceProvider = serviceProvider;
         this.parent = new AtomicReference<>();
         this.scheduler = scheduler;
         this.comments = new AtomicReference<>("");
         this.encryptor = encryptor;
+        this.flowController = flowController;
+
         name = new AtomicReference<>();
         position = new AtomicReference<>(new Position(0D, 0D));
     }
@@ -327,11 +333,15 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
+    private StateManager getStateManager(final String componentId) {
+        return 
flowController.getStateManagerProvider().getStateManager(componentId);
+    }
+
     @SuppressWarnings("deprecation")
     private void shutdown(final ProcessGroup procGroup) {
         for (final ProcessorNode node : procGroup.getProcessors()) {
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                final StandardProcessContext processContext = new 
StandardProcessContext(node, controllerServiceProvider, encryptor);
+                final StandardProcessContext processContext = new 
StandardProcessContext(node, controllerServiceProvider, encryptor, 
getStateManager(node.getIdentifier()));
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, 
org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor(), 
processContext);
             }
         }
@@ -681,8 +691,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Override
+    @SuppressWarnings("deprecation")
     public void removeProcessor(final ProcessorNode processor) {
         final String id = requireNonNull(processor).getIdentifier();
         writeLock.lock();
@@ -697,7 +707,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             }
 
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                final StandardProcessContext processContext = new 
StandardProcessContext(processor, controllerServiceProvider, encryptor);
+                final StandardProcessContext processContext = new 
StandardProcessContext(processor, controllerServiceProvider, encryptor, 
getStateManager(processor.getIdentifier()));
                 
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, 
org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), 
processContext);
             } catch (final Exception e) {
                 throw new ComponentLifeCycleException("Failed to invoke 
'OnRemoved' methods of " + processor, e);
@@ -719,6 +729,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             processors.remove(id);
             
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
 
+            
flowController.getStateManagerProvider().onComponentRemoved(processor.getIdentifier());
+
             // must copy to avoid a concurrent modification
             final Set<Connection> copy = new 
HashSet<>(processor.getConnections());
             for (final Connection conn : copy) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 8e52a46..5bb1a86 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -29,6 +29,7 @@ import 
org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.attribute.expression.language.Query.Range;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
@@ -43,11 +44,13 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
     private final ControllerServiceProvider controllerServiceProvider;
     private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
     private final StringEncryptor encryptor;
+    private final StateManager stateManager;
 
-    public StandardProcessContext(final ProcessorNode processorNode, final 
ControllerServiceProvider controllerServiceProvider, final StringEncryptor 
encryptor) {
+    public StandardProcessContext(final ProcessorNode processorNode, final 
ControllerServiceProvider controllerServiceProvider, final StringEncryptor 
encryptor, final StateManager stateManager) {
         this.procNode = processorNode;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
+        this.stateManager = stateManager;
 
         preparedQueries = new HashMap<>();
         for (final Map.Entry<PropertyDescriptor, String> entry : 
procNode.getProperties().entrySet()) {
@@ -208,4 +211,9 @@ public class StandardProcessContext implements 
ProcessContext, ControllerService
         final List<Range> elRanges = 
Query.extractExpressionRanges(getProperty(property).getValue());
         return (elRanges != null && !elRanges.isEmpty());
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

Reply via email to