NIFI-259: Initial implementation of State Management feature

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/57dadb72
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/57dadb72
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/57dadb72

Branch: refs/heads/NIFI-259
Commit: 57dadb7286c49d035df83ac565b0d72817469ba4
Parents: 0c68e2c
Author: Mark Payne <[email protected]>
Authored: Mon Jan 11 08:28:12 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Jan 11 08:28:32 2016 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/components/state/Scope.java |  36 ++
 .../nifi/components/state/StateManager.java     |  84 +++++
 .../apache/nifi/components/state/StateMap.java  |  51 +++
 .../nifi/components/state/StateProvider.java    | 127 +++++++
 .../StateProviderInitializationContext.java     |  56 ++++
 .../ControllerServiceInitializationContext.java |   6 +
 .../apache/nifi/processor/ProcessContext.java   |   6 +
 .../apache/nifi/processor/ProcessSession.java   |   1 +
 .../apache/nifi/reporting/ReportingContext.java |   6 +
 nifi-assembly/pom.xml                           |   6 +
 .../org/apache/nifi/util/NiFiProperties.java    |  34 ++
 .../client/socket/EndpointConnectionPool.java   |   2 +-
 .../nifi/remote/cluster/NodeInformation.java    |  16 +-
 .../remote/cluster/NodeInformationAdapter.java  |   2 +-
 .../src/main/asciidoc/administration-guide.adoc |  90 +++++
 .../src/main/asciidoc/developer-guide.adoc      |  65 ++++
 .../org/apache/nifi/state/MockStateManager.java | 172 ++++++++++
 .../org/apache/nifi/state/MockStateMap.java     |  49 +++
 ...kControllerServiceInitializationContext.java |  17 +-
 .../apache/nifi/util/MockProcessContext.java    |  23 +-
 .../apache/nifi/util/MockReportingContext.java  |  10 +-
 .../apache/nifi/util/MockValidationContext.java |  10 +-
 .../nifi/util/StandardProcessorTestRunner.java  |  17 +-
 .../java/org/apache/nifi/util/TestRunner.java   |   6 +
 .../nifi/util/TestMockProcessContext.java       |   2 +-
 ...kControllerServiceInitializationContext.java |   8 +-
 .../documentation/mock/MockProcessContext.java  |   8 +-
 .../nifi/cluster/protocol/ClusterNodes.java     |  39 +++
 .../cluster/protocol/ConnectionResponse.java    |  27 +-
 .../nifi/cluster/protocol/NodeIdentifier.java   |  43 ++-
 .../jaxb/message/AdaptedConnectionResponse.java |  10 +-
 .../jaxb/message/AdaptedNodeIdentifier.java     |  31 +-
 .../jaxb/message/ConnectionResponseAdapter.java |   8 +-
 .../jaxb/message/NodeIdentifierAdapter.java     |   6 +-
 .../ClusterManagerProtocolSenderImplTest.java   |   6 +-
 .../impl/NodeProtocolSenderImplTest.java        |   9 +-
 .../org/apache/nifi/cluster/event/Event.java    |   1 +
 .../cluster/event/impl/EventManagerImpl.java    |   1 +
 .../impl/FileBasedClusterNodeFirewall.java      |   3 +-
 .../cluster/manager/HttpClusterManager.java     |  14 +-
 .../cluster/manager/HttpRequestReplicator.java  |   3 +-
 .../cluster/manager/HttpResponseMapper.java     |   1 +
 .../nifi/cluster/manager/NodeResponse.java      |   7 +-
 .../exception/ConflictingNodeIdException.java   |  46 +++
 .../manager/impl/ClusteredEventAccess.java      |   8 +-
 .../manager/impl/ClusteredReportingContext.java |  12 +-
 .../manager/impl/HttpRequestReplicatorImpl.java |  19 +-
 .../manager/impl/HttpResponseMapperImpl.java    |   1 +
 .../cluster/manager/impl/WebClusterManager.java |  74 +++--
 .../java/org/apache/nifi/cluster/node/Node.java |   1 -
 ...anagerProtocolServiceLocatorFactoryBean.java |   1 -
 ...FileBasedClusterNodeFirewallFactoryBean.java |   1 +
 .../reporting/ClusteredReportingTaskNode.java   |   7 +-
 .../event/impl/EventManagerImplTest.java        |   8 +-
 .../impl/FileBasedClusterNodeFirewallTest.java  |   8 +-
 .../impl/DataFlowManagementServiceImplTest.java |  27 +-
 .../impl/HttpRequestReplicatorImplTest.java     |  38 ++-
 .../impl/HttpResponseMapperImplTest.java        |  21 +-
 .../manager/impl/TestWebClusterManager.java     |   2 -
 .../cluster/manager/testutils/HttpRequest.java  |   2 +
 .../cluster/manager/testutils/HttpResponse.java |   1 +
 .../cluster/manager/testutils/HttpServer.java   |   1 +
 .../components/state/StateManagerProvider.java  |  60 ++++
 .../controller/AbstractConfiguredComponent.java |  20 +-
 .../nifi/controller/ConfiguredComponent.java    |  14 +-
 .../nifi-framework/nifi-framework-core/pom.xml  |  48 ++-
 .../apache/nifi/cluster/HeartbeatPayload.java   |  18 -
 .../apache/nifi/controller/FlowController.java  | 136 +++++---
 .../nifi/controller/StandardFlowService.java    |  50 ++-
 .../controller/StandardFlowSynchronizer.java    |  16 +-
 .../reporting/AbstractReportingTaskNode.java    |   8 +-
 .../reporting/StandardReportingContext.java     |  10 +-
 .../reporting/StandardReportingTaskNode.java    |   2 +-
 .../repository/BatchingSessionFactory.java      |   4 +-
 .../scheduling/ConnectableProcessContext.java   |  10 +-
 .../scheduling/EventDrivenSchedulingAgent.java  |  21 +-
 .../scheduling/QuartzSchedulingAgent.java       |  10 +-
 .../controller/scheduling/ScheduleState.java    |  10 +-
 .../scheduling/StandardProcessScheduler.java    |  31 +-
 .../scheduling/TimerDrivenSchedulingAgent.java  |   9 +-
 .../service/ControllerServiceLoader.java        |   4 +-
 ...dControllerServiceInitializationContext.java |  10 +-
 .../service/StandardControllerServiceNode.java  |   8 +-
 .../StandardControllerServiceProvider.java      |  16 +-
 .../nifi/controller/state/ClusterState.java     |  38 +++
 .../controller/state/ConfigParseException.java  |  30 ++
 .../nifi/controller/state/NodeDescription.java  |  39 +++
 .../controller/state/StandardStateManager.java  |  73 ++++
 .../nifi/controller/state/StandardStateMap.java |  49 +++
 ...ndardStateProviderInitializationContext.java |  60 ++++
 .../nifi/controller/state/StateMapSerDe.java    | 103 ++++++
 .../nifi/controller/state/StateMapUpdate.java   |  45 +++
 .../state/StateProviderException.java           |  30 ++
 .../state/config/StateManagerConfiguration.java | 142 ++++++++
 .../config/StateProviderConfiguration.java      |  51 +++
 .../state/config/StateProviderScope.java        |  30 ++
 .../manager/StandardStateManagerProvider.java   | 295 +++++++++++++++++
 .../state/providers/AbstractStateProvider.java  |  58 ++++
 .../local/WriteAheadLocalStateProvider.java     | 224 +++++++++++++
 .../zookeeper/ZooKeeperStateProvider.java       | 330 +++++++++++++++++++
 .../state/server/ZooKeeperStateServer.java      |  77 +++++
 .../nifi/groups/StandardProcessGroup.java       |  20 +-
 .../nifi/processor/StandardProcessContext.java  |  10 +-
 .../processor/StandardSchedulingContext.java    |  10 +-
 ...g.apache.nifi.components.state.StateProvider |  16 +
 .../TestStandardProcessScheduler.java           |  24 +-
 .../StandardControllerServiceProviderTest.java  |  28 +-
 .../TestStandardControllerServiceProvider.java  |  78 +++--
 .../providers/AbstractTestStateProvider.java    | 153 +++++++++
 .../local/TestWriteAheadLocalStateProvider.java |  90 +++++
 .../zookeeper/TestZooKeeperStateProvider.java   | 106 ++++++
 .../src/main/resources/conf/nifi.properties     |  14 +
 .../main/resources/conf/state-management.xml    |  52 +++
 .../main/resources/conf/zookeeper.properties    |  30 ++
 .../socket/ClusterManagerServerProtocol.java    |   2 +-
 .../dao/impl/StandardControllerServiceDAO.java  |   4 +-
 .../nifi/web/dao/impl/StandardProcessorDAO.java |   4 +-
 .../web/dao/impl/StandardReportingTaskDAO.java  |   4 +-
 .../resources/access-control/nifi.properties    |   7 +
 .../access-control/state-management.xml         |  38 +++
 .../processors/hl7/ExtractHL7Attributes.java    |   3 +-
 .../apache/nifi/processors/hl7/RouteHL7.java    |   2 +-
 .../processors/kafka/test/EmbeddedKafka.java    |   3 +-
 .../standard/AbstractListProcessor.java         | 288 ++++++++--------
 .../nifi/processors/standard/GetHTTP.java       | 122 ++-----
 .../standard/TestAbstractListProcessor.java     |  34 +-
 .../standard/TestDetectDuplicate.java           |  42 +--
 .../nifi/processors/standard/TestGetHTTP.java   | 108 +-----
 .../standard/TestRouteOnAttribute.java          |   5 +-
 pom.xml                                         | 121 +++++--
 130 files changed, 4209 insertions(+), 795 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
new file mode 100644
index 0000000..8daf12a
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+/**
+ * A Scope represents how a NiFi component's state is to be stored and 
retrieved when running in a cluster.
+ */
+public enum Scope {
+
+    /**
+     * State is to be treated as "global" across the cluster. I.e., the same 
component on all nodes will
+     * have access to the same state.
+     */
+    CLUSTER,
+
+    /**
+     * State is to be treated local to the node. I.e., the same component will 
have different state on each
+     * node in the cluster.
+     */
+    LOCAL;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
new file mode 100644
index 0000000..f52b2e3
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * <p>
+ * The StateManager is responsible for providing NiFi components a mechanism 
for storing
+ * and retrieving state.
+ * </p>
+ *
+ * <p>
+ * When calling methods in this class, the {@link Scope} is used in order to 
specify whether
+ * state should be stored/retrieved from the local state or the clustered 
state. However, if
+ * any instance of NiFi is not clustered (or is disconnected from its 
cluster), the Scope is
+ * not really relevant and the local state will be used in all cases. This 
allows component
+ * developers to not concern themselves with whether or not a particular 
instance of NiFi is
+ * clustered. Instead, developers should assume that the instance is indeed 
clustered and write
+ * the component accordingly. If not clustered, the component will still 
behavior in the same
+ * manner, as a standalone node could be thought of as a "cluster of 1."
+ * </p>
+ */
+public interface StateManager {
+
+    /**
+     * Updates the value of the component's state, setting it to given value
+     *
+     * @param state the value to change the state to
+     * @param scope the scope to use when storing the state
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    void setState(Map<String, String> state, Scope scope) throws IOException;
+
+    /**
+     * Returns the current state for the component. This return value will 
never be <code>null</code>.
+     * If the state has not yet been set, the StateMap's version will be -1, 
and the map of values will be empty.
+     *
+     * @param scope the scope to use when fetching the state
+     * @return
+     * @throws IOException
+     */
+    StateMap getState(Scope scope) throws IOException;
+
+    /**
+     * Updates the value of the component's state to the new value if and only 
if the value currently
+     * is the same as the given oldValue.
+     *
+     * @param oldValue the old value to compare against
+     * @param newValue the new value to use if and only if the state's value 
is the same as the given oldValue
+     * @param scope the scope to use for storing the new state
+     * @return <code>true</code> if the state was updated to the new value, 
<code>false</code> if the state's value was not
+     *         equal to oldValue
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    boolean replace(StateMap oldValue, Map<String, String> newValue, Scope 
scope) throws IOException;
+
+    /**
+     * Clears all keys and values from the component's state
+     *
+     * @param scope the scope whose values should be cleared
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    void clear(Scope scope) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java
new file mode 100644
index 0000000..7984e42
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import java.util.Map;
+
+/**
+ * Provides a representation of a component's state at some point in time.
+ */
+public interface StateMap {
+    /**
+     * Each time that a component's state is updated, the state is assigned a 
new version.
+     * This version can then be used to atomically update state by the backing 
storage mechanism.
+     * Though this number is monotonically increasing, it should not be 
expected to increment always
+     * from X to X+1. I.e., version numbers may be skipped.
+     *
+     * @return the version associated with the state
+     */
+    long getVersion();
+
+    /**
+     * Returns the value associated with the given key
+     * 
+     * @param key the key whose value should be retrieved
+     * @return the value associated with the given key, or <code>null</code> 
if no value is associated
+     *         with this key.
+     */
+    String get(String key);
+
+    /**
+     * Returns an immutable Map representation of all keys and values for the 
state of a component.
+     *
+     * @return an immutable Map representation of all keys and values for the 
state of a component.
+     */
+    Map<String, String> toMap();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
new file mode 100644
index 0000000..e0243f3
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.components.ConfigurableComponent;
+
+/**
+ * <p>
+ * Provides a mechanism by which components can store and retrieve state. 
Depending on the Provider, the state
+ * may be stored locally, or it may be stored on a remote resource.
+ * </p>
+ *
+ * <p>
+ * Which implementation should be used for local and clustered state is 
configured in the NiFi properties file.
+ * It is therefore possible to provide custom implementations of this 
interface. Note, however, that this interface
+ * is new as of version 0.5.0 of Apache NiFi and may not be considered 
"stable" as of yet. Therefore, it is subject
+ * to change without notice, so providing custom implementations is cautioned 
against until the API becomes more stable.
+ * </p>
+ *
+ * @since 0.5.0
+ */
+public interface StateProvider extends ConfigurableComponent {
+
+    /**
+     * Initializes the StateProvider so that it is capable of being used. This 
method will be called
+     * once before any of the other methods are called and will not be called 
again until the {@link #shutdown()}
+     * method has been called
+     *
+     * @param context the initialization context that can be used to prepare 
the state provider for use
+     */
+    void initialize(StateProviderInitializationContext context) throws 
IOException;
+
+    /**
+     * Shuts down the StateProvider and cleans up any resources held by it. 
Once this method has returned, the
+     * StateProvider may be initialized once again via the {@link 
#initialize(StateProviderInitializationContext)} method.
+     */
+    void shutdown();
+
+    /**
+     * Updates the value of the component's state, setting the new value to the
+     * given state
+     *
+     * @param state the value to change the state to
+     * @param componentId the id of the component for which state is being set
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    void setState(Map<String, String> state, String componentId) throws 
IOException;
+
+
+    /**
+     * Returns the currently configured state for the component. The returned 
StateMap will never be null.
+     * The version of the StateMap will be -1 and the state will contain no 
key/value pairs if the state has never been set.
+     *
+     * @param componentId the id of the component for which state is to be 
retrieved
+     * @return the currently configured value for the component's state
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    StateMap getState(String componentId) throws IOException;
+
+
+    /**
+     * Updates the value of the component's state to the new value if and only 
if the value currently
+     * is the same as the given oldValue.
+     *
+     * @param oldValue the old value to compare against
+     * @param newValue the new value to use if and only if the state's value 
is the same as the given oldValue
+     * @param componentId the id of the component for which state is being 
retrieved
+     * @return <code>true</code> if the state was updated to the new value, 
<code>false</code> if the state's value was not
+     *         equal to oldValue
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    boolean replace(StateMap oldValue, Map<String, String> newValue, String 
componentId) throws IOException;
+
+    /**
+     * Removes all values from the component's state that is stored using the 
given scope
+     *
+     * @param componentId the id of the component for which state is being 
cleared
+     *
+     * @throws IOException if unable to communicate with the underlying 
storage mechanism
+     */
+    void clear(String componentId) throws IOException;
+
+    /**
+     * This method is called whenever a component is removed from the NiFi 
instance. This allows the State Provider to
+     * perform tasks when a component is removed in order to clean up 
resources that may be associated with that component
+     *
+     * @param componentId the ID of the component that was added to the NiFi 
instance
+     * @throws IOException if unable to perform the necessary cleanup
+     */
+    void onComponentRemoved(String componentId) throws IOException;
+
+    /**
+     * Notifies the state provider that it should begin servicing requests to 
store and retrieve state
+     */
+    void enable();
+
+    /**
+     * Notifies the state provider that it should stop servicing requests to 
store and retrieve state and instead throw a ProviderDisabledException if any 
request is made to do so
+     */
+    void disable();
+
+    /**
+     * @return <code>true</code> if the provider is enabled, 
<code>false</code> otherwise.
+     */
+    boolean isEnabled();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
new file mode 100644
index 0000000..aaf5490
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import java.util.Map;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+
+/**
+ * This interface defines an initialization context that is passed to a {@link 
StateProvider} when it
+ * is initialized.
+ */
+public interface StateProviderInitializationContext {
+    /**
+     * @return the identifier if the StateProvider
+     */
+    String getIdentifier();
+
+    /**
+     * @return a Map of Property Descriptors to their configured values
+     */
+    Map<PropertyDescriptor, PropertyValue> getProperties();
+
+    /**
+     * Returns the configured value for the given property
+     *
+     * @param property the property to retrieve the value for
+     *
+     * @return the configured value for the property.
+     */
+    PropertyValue getProperty(PropertyDescriptor property);
+
+    /**
+     * @return the SSL Context that should be used to communicate with remote 
resources,
+     *         or <code>null</code> if no SSLContext has been configured
+     */
+    SSLContext getSSLContext();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java
index 6fcee0c..3486621 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.logging.ComponentLog;
 
 public interface ControllerServiceInitializationContext {
@@ -37,4 +38,9 @@ public interface ControllerServiceInitializationContext {
      * way and generate bulletins when appropriate
      */
     ComponentLog getLogger();
+
+    /**
+     * @return the StateManager that can be used to store and retrieve state 
for this component
+     */
+    StateManager getStateManager();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java 
b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index cf1bb6c..91ea1a2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -21,6 +21,7 @@ import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -154,4 +155,9 @@ public interface ProcessContext {
      * does not allow the Expression Language, even if a seemingly valid 
Expression is present in the value.
      */
     boolean isExpressionLanguagePresent(PropertyDescriptor property);
+
+    /**
+     * @return the StateManager that can be used to store and retrieve state 
for this component
+     */
+    StateManager getStateManager();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java 
b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
index d0c5b46..5d867b7 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.exception.FlowFileAccessException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java 
b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
index 281194c..cb131e2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
@@ -20,6 +20,7 @@ import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -86,4 +87,9 @@ public interface ReportingContext {
      * Controller Services
      */
     ControllerServiceLookup getControllerServiceLookup();
+
+    /**
+     * @return the StateManager that can be used to store and retrieve state 
for this component
+     */
+    StateManager getStateManager();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index b8e83bd..1b668a5 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -283,6 +283,12 @@ language governing permissions and limitations under the 
License. -->
         <nifi.templates.directory>./conf/templates</nifi.templates.directory>
         
<nifi.database.directory>./database_repository</nifi.database.directory>
 
+        
<nifi.state.management.configuration.file>./conf/state-management.xml</nifi.state.management.configuration.file>
+        
<nifi.state.management.embedded.zookeeper.start>false</nifi.state.management.embedded.zookeeper.start>
+        
<nifi.state.management.embedded.zookeeper.properties>./conf/zookeeper.properties</nifi.state.management.embedded.zookeeper.properties>
+        
<nifi.state.management.provider.local>local-provider</nifi.state.management.provider.local>
+        
<nifi.state.management.provider.cluster>zk-provider</nifi.state.management.provider.cluster>
+        
         
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
         
<nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory>
         
<nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions>

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index c82a220..5b35328 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -189,6 +189,13 @@ public class NiFiProperties extends Properties {
     // kerberos properties
     public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file";
 
+    // state management
+    public static final String STATE_MANAGEMENT_CONFIG_FILE = 
"nifi.state.management.configuration.file";
+    public static final String STATE_MANAGEMENT_LOCAL_PROVIDER_ID = 
"nifi.state.management.provider.local";
+    public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_ID = 
"nifi.state.management.provider.cluster";
+    public static final String STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER = 
"nifi.state.management.embedded.zookeeper.start";
+    public static final String STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES = 
"nifi.state.management.embedded.zookeeper.properties";
+
     // defaults
     public static final String DEFAULT_TITLE = "NiFi";
     public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
@@ -236,6 +243,9 @@ public class NiFiProperties extends Properties {
     public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10;
     public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 
sec";
 
+    // state management defaults
+    public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = 
"conf/state-management.xml";
+
     private NiFiProperties() {
         super();
     }
@@ -985,4 +995,28 @@ public class NiFiProperties extends Properties {
     public String getBoredYieldDuration() {
         return getProperty(BORED_YIELD_DURATION, DEFAULT_BORED_YIELD_DURATION);
     }
+
+    public File getStateManagementConfigFile() {
+        return new File(getProperty(STATE_MANAGEMENT_CONFIG_FILE, 
DEFAULT_STATE_MANAGEMENT_CONFIG_FILE));
+    }
+
+    /*
+     * public static final String STATE_MANAGEMENT_MAX_ZOOKEEPER_SERVERS = 
"nifi.state.management.embedded.zookeeper.max.instances";
+     */
+    public String getLocalStateProviderId() {
+        return getProperty(STATE_MANAGEMENT_LOCAL_PROVIDER_ID);
+    }
+
+    public String getClusterStateProviderId() {
+        return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID);
+    }
+
+    public File getEmbeddedZooKeeperPropertiesFile() {
+        final String filename = 
getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES);
+        return filename == null ? null : new File(filename);
+    }
+
+    public boolean isStartEmbeddedZooKeeper() {
+        return 
Boolean.parseBoolean(getProperty(STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 18075db..81ca70d 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -761,7 +761,7 @@ public class EndpointConnectionPool {
                     final int index = n % destinations.size();
                     PeerStatus status = destinations.get(index);
                     if (status == null) {
-                        final PeerDescription description = new 
PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), 
nodeInfo.isSiteToSiteSecure());
+                        final PeerDescription description = new 
PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), 
nodeInfo.isSiteToSiteSecure());
                         status = new PeerStatus(description, 
nodeInfo.getTotalFlowFiles());
                         destinations.set(index, status);
                         break;

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
index 2041268..abfcc85 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java
@@ -18,23 +18,23 @@ package org.apache.nifi.remote.cluster;
 
 public class NodeInformation {
 
-    private final String hostname;
+    private final String siteToSiteHostname;
     private final Integer siteToSitePort;
     private final int apiPort;
     private final boolean isSiteToSiteSecure;
     private final int totalFlowFiles;
 
-    public NodeInformation(final String hostname, final Integer 
siteToSitePort, final int apiPort,
+    public NodeInformation(final String siteToSiteHostname, final Integer 
siteToSitePort, final int apiPort,
             final boolean isSiteToSiteSecure, final int totalFlowFiles) {
-        this.hostname = hostname;
+        this.siteToSiteHostname = siteToSiteHostname;
         this.siteToSitePort = siteToSitePort;
         this.apiPort = apiPort;
         this.isSiteToSiteSecure = isSiteToSiteSecure;
         this.totalFlowFiles = totalFlowFiles;
     }
 
-    public String getHostname() {
-        return hostname;
+    public String getSiteToSiteHostname() {
+        return siteToSiteHostname;
     }
 
     public int getAPIPort() {
@@ -66,7 +66,7 @@ public class NodeInformation {
         }
 
         final NodeInformation other = (NodeInformation) obj;
-        if (!hostname.equals(other.hostname)) {
+        if (!siteToSiteHostname.equals(other.siteToSiteHostname)) {
             return false;
         }
         if (siteToSitePort == null && other.siteToSitePort != null) {
@@ -88,11 +88,11 @@ public class NodeInformation {
 
     @Override
     public int hashCode() {
-        return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : 
siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
+        return 83832 + siteToSiteHostname.hashCode() + (siteToSitePort == null 
? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
     }
 
     @Override
     public String toString() {
-        return "Node[" + hostname + ":" + apiPort + "]";
+        return "Node[" + siteToSiteHostname + ":" + apiPort + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
index 440463c..b2dead0 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java
@@ -30,7 +30,7 @@ public class NodeInformationAdapter extends 
XmlAdapter<AdaptedNodeInformation, N
     @Override
     public AdaptedNodeInformation marshal(final NodeInformation 
nodeInformation) throws Exception {
         final AdaptedNodeInformation adapted = new AdaptedNodeInformation();
-        adapted.setHostname(nodeInformation.getHostname());
+        adapted.setHostname(nodeInformation.getSiteToSiteHostname());
         adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort());
         adapted.setApiPort(nodeInformation.getAPIPort());
         adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure());

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 830d542..9d1a442 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -367,6 +367,78 @@ in the remote cluster can be included in the same group. 
When the ADMIN wants to
 cluster, s/he can grant it to the group and avoid having to grant it 
individually to each node in the cluster.
 
 
+
+[[state_management]]
+State Management
+----------------
+
+NiFi provides a mechanism for Processors, Reporting Tasks, Controller 
Services, and the framework itself to persist state. This
+allows a Processor, for example, to resume from the place where it left off 
after NiFi is restarted. Additionally, it allows for
+a Processor to store some piece of information so that the Processor can 
access that information from all of the different nodes
+in the cluster. This allows one node to pick up where another node left off, 
or to coordinate across all of the nodes in a cluster.
+
+[[state_providers]]
+*Configuring State Providers*
+When a component decides to store or retrieve state, it does so by providing a 
"Scope" - either Node-local or Cluster-wide. The
+mechanism that is used to store and retrieve this state is then determined 
based on this Scope, as well as the configured State
+Providers. The _nifi.properties_ file contains three different properties that 
are relevant to configuring these State Providers.
+The first is the `nifi.state.management.configuration.file` property specifies 
an external XML file that is used for configuring
+the local and cluster-wide State Providers. This XML file may contain 
configurations for multiple providers, so the
+`nifi.state.management.provider.local` property provides the identifier of the 
local State Provider configured in this XML file.
+Similarly, the `nifi.state.management.provider.cluster` property provides the 
identifier of the cluster-wide State Provider
+configured in this XML file.
+
+This XML file consists of a top-level `state-management` element, which has 
one or more `local-provider` and zero or more
+`cluster-provider` elements. Each of these elements then contains an `id` 
element that is used to specify the identifier that can
+be referenced in the _nifi.properties_ file, as well as a `class` element that 
specifies the fully-qualified class name to use
+in order to instantiate the State Provider. Finally, each of these elements 
may have zero or more `property` elements. Each
+`property` element has an attribute, `name` that is the name of the property 
that the State Provider supports. The textual content
+of the `property` element is the value of the property.
+
+Once these State Providers have been configured in the _state-management.xml_ 
file (or whatever file is configured), those Providers
+may be referenced by their identifiers. By default, the Local State Provider 
is configured to be a `WriteAheadLocalStateProvider` that
+persists the data to the _$NIFI_HOME/state_ directory. The default Cluster 
State Provider is configured to be a `ZooKeeperStateProvider`.
+The default ZooKeeper-based provider must have its `Connect String` property 
populated before it can be used. It is also advisable,
+if multiple NiFi instances will use the same ZooKeeper instance, that the 
value of the `Root Node` property be changed. For instance,
+one might set the value to `/nifi/<team name>/production`.
+
+If NiFi is configured to run in a standalone mode, the 
`cluster-state-provider` element need not be populated in the 
_state-management.xml_
+file and will actually be ignored if they are populated. However, the 
`local-state-provider` element must always be present and populated.
+Additionally, if NiFi is run in a cluster, each node must also have the 
`cluster-state-provider` element present and properly configured. Otherwise,
+NiFi will fail to startup. 
+
+While there are not many properties that need to be configured for these 
providers, they were externalized into a separate _state-providers.xml_
+file, rather than being configured via the _nifi.properties_ file, simply 
because different implementations may require different properties,
+and it is easier to maintain and understand the configuration in an XML-based 
file such as this, than to mix the properties of the Provider
+in with all of the other NiFi framework-specific properties.
+
+
+[[embedded_zookeeper]]
+*Embedded ZooKeeper Server*
+As mentioned above, the default State Provider for cluster-wide state is the 
`ZooKeeperStateProvider`. At the time of this writing, this is the
+only State Provider that exists for handling cluster-wide state. What this 
means is that NiFi has a dependencies on ZooKeeper in order to
+behave as a cluster. However, there are many environments in which NiFi is 
deployed where there is no existing ZooKeeper ensemble being maintained.
+In order to avoid the burden of forcing administrators to also maintain a 
separate ZooKeeper instance, NiFi provides the option of starting an
+embedded ZooKeeper server.
+
+This can be accomplished by setting the 
`nifi.state.management.embedded.zookeeper.start` property in _nifi.properties_ 
to `true` on those nodes
+that should run the embedded ZooKeeper server. Generally, it is advisable to 
run ZooKeeper on either 3 or 5 nodes. Running on fewer than 3 nodes
+provides less durability in the face of failure. Running on more than 5 nodes 
generally produces more network traffic than is necessary. Additionally,
+running ZooKeeper on 4 nodes provides no more benefit than running on 3 nodes, 
ZooKeeper requires a majority of nodes be active in order to function.
+However, it is up to the administrator to determine the number of nodes most 
appropriate to the particular deployment of NiFi.
+
+If the `nifi.state.management.embedded.zookeeper.start` property is set to 
`true`, the `nifi.state.management.embedded.zookeeper.properties` property
+in _nifi.properties_ also becomes relevant. This specifies the ZooKeeper 
properties file to use. At a minimum, This properties file needs to be populated
+with the list of ZooKeeper servers. Each of these servers is configured as 
<hostname>:<client port>[:<leader election port>]. For example, 
`myhost:2888:3888`.
+This list of nodes should be the same nodes in the NiFi cluster that have the 
`nifi.state.management.embedded.zookeeper.start`
+property set to `true`. Also note that because ZooKeeper will be listening on 
these ports, the firewall may need to be configured to open these ports
+for incoming traffic, at least between nodes in the cluster.
+
+For more information on the properties used to administer ZooKeeper, see the
+link:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html[ZooKeeper 
Admin Guide].
+
+
+
 [[clustering]]
 Clustering Configuration
 ------------------------
@@ -433,6 +505,7 @@ For the NCM, the minimum properties to configure are as 
follows:
 For Node 1, the minimum properties to configure are as follows:
 
 * Under the Web Properties, set either the http or https port that you want 
Node 1 to run on. If the NCM is running on the same server, choose a different 
web port for Node 1. Also, consider whether you need to set the http or https 
host property.
+* Under the State Management section, set the 
`nifi.state.management.provider.cluster` property to the identifier of the 
Cluster State Provider. Ensure that the Cluster State Provider has been 
configured in the _state-management.xml_ file. See <<state_providers>> for more 
information.
 * Under Cluster Node Properties, set the following:
 ** nifi.cluster.is.node - Set this to _true_.
 ** nifi.cluster.node.address - Set this to the fully qualified hostname of the 
node. If left blank, it defaults to "localhost".
@@ -443,6 +516,7 @@ For Node 1, the minimum properties to configure are as 
follows:
 For Node 2, the minimum properties to configure are as follows:
 
 * Under the Web Properties, set either the http or https port that you want 
Node 2 to run on. Also, consider whether you need to set the http or https host 
property.
+* Under the State Management section, set the 
`nifi.state.management.provider.cluster` property to the identifier of the 
Cluster State Provider. Ensure that the Cluster State Provider has been 
configured in the _state-management.xml_ file. See <<state_providers>> for more 
information.
 * Under the Cluster Node Properties, set the following:
 ** nifi.cluster.is.node - Set this to _true_.
 ** nifi.cluster.node.address - Set this to the fully qualified hostname of the 
node. If left blank, it defaults to "localhost".
@@ -631,6 +705,22 @@ only consider if 
`nifi.security.user.login.identity.provider` configured with a
 |nifi.documentation.working.directory|The documentation working directory. The 
default value is ./work/docs/components and probably should be left as is.
 |====
 
+
+*State Management* +
+
+The State Management section of the Properties file provides a mechanism for 
configuring local and cluster-wide mechanisms
+for components to persist state. See the <<state_management>> section for more 
information on how this is used.
+
+|====
+|*Property*|*Description*
+|nifi.state.management.configuration.file|The XML file that contains 
configuration for the local and cluster-wide State Providers. The default value 
is _./conf/state-management.xml_
+|nifi.state.management.provider.local|The ID of the Local State Provider to 
use. This value must match the value of the `id` element of one of the 
`local-provider` elements in the _state-management.xml_ file.
+|nifi.state.management.provider.cluster|The ID of the Cluster State Provider 
to use. This value must match the value of the `id` element of one of the 
`cluster-provider` elements in the _state-management.xml_ file. This value is 
ignored if not clustered but is required for nodes in a cluster.
+|nifi.state.management.embedded.zookeeper.start|Specifies whether or not this 
instance of NiFi should start an embedded ZooKeeper Server. This is used in 
conjunction with the ZooKeeperStateProvider.
+|nifi.state.management.embedded.zookeeper.properties|Specifies a properties 
file that contains the configuration for the embedded ZooKeeper Server that is 
started (if the `|nifi.state.management.embedded.zookeeper.start` property is 
set to `true`)
+||====
+
+
 *H2 Settings* +
 
 The H2 Settings section defines the settings for the H2 database, which keeps 
track of user access and flow controller history.

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-docs/src/main/asciidoc/developer-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc 
b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index aeea3dc..f9bf675 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -216,6 +216,13 @@ class, calling the appropriate methods
 to fill in the details of the Relationship, and finally calling the
 `build` method.
 
+[[state_manager]]
+==== StateManager
+The StateManager provides Processors, Reporting Tasks, and Controller Services 
a mechanism
+for easily storing and retrieving state. The API is similar to that of 
ConcurrentHashMap
+but requires a Scope for each operation. The Scope indicates whether the state 
is to be
+retrieved/stored locally or in a cluster-wide manner. For more information, 
see the
+<<state_manager>> section.
 
 [[processor_initialization_context]]
 ==== ProcessorInitializationContext
@@ -558,6 +565,64 @@ for instance, they should not be
 relied upon for critical business logic.
 
 
+[[state_manager]]
+=== State Manager
+
+From the ProcessContext, ReportingContext, and 
ControllerServiceInitializationContext, components are
+able to call the `getStateManager()` method. This State Manager is responsible 
for providing a simple API
+for storing and retrieving state. As such, the API is designed to be quite 
similar to the ConcurrentMap
+API, which most Java developers are already familiar with.
+
+
+[[state_scope]]
+==== Scope
+One very notable difference between the StateManager API and the ConcurrentMap 
API, however, is the presence
+of a Scope object on each method call of the StateManager. This Scope will 
either be `Scope.NODE` or `Scope.CLUSTER`.
+If NiFi is run in a cluster, this Scope provides important information to the 
framework about how the operation should
+occur.
+
+If state as stored using `Scope.CLUSTER`, then all nodes in the cluster will 
be communicating with the same
+state storage mechanism, as if all nodes were to share a single ConcurrentMap. 
If state is stored and retrieved using
+`Scope.NODE`, then each node will see a different representation of the state.
+
+It is also worth noting that if NiFi is configured to run as a standalone 
instance, rather than running in a cluster,
+a scope of `Scope.NODE` is always used. This is done in order to allow the 
developer of a NiFi component to write the code
+in one consistent way, without worrying about whether or not the NiFi instance 
is clustered. The developer should instead assume
+that the instance is clustered and write the code accordingly.
+
+
+==== Storing and Retrieving State
+
+State is stored using the StateManager's `set`, `replace`, `putIfAbsent`, 
`remove`, and `clear` methods. All of these methods,
+with the exception of `clear` take as the first argument the key to be set. 
The key that is used is unique only to the same
+instance of the component and for the same Scope. That is, if two Processors 
store a value using the key _My Key_, those Processors
+will not conflict with each other, even if both Processors are of the same 
type (e.g., both are of type ListFile). Furthermore,
+if a Processor stores a value with the key of _My Key_ using the 
`Scope.CLUSTER` scope, and then attempts to retrieve the value
+using the `Scope.NODE` scope, the value retrieved will be `null`. Each 
Processor's state, then, is stored in isolation from other
+Processor's state. A unique key can be thought of as a triple of <Processor 
Instance, Key, Scope>.
+
+It follows, then, that two Processors cannot share the same state. There are, 
however, some circumstances in which it is very
+necessary to share state between two Processors of different types, or two 
Processors of the same type. This can be accomplished
+by using a Controller Service. By storing and retrieving state from a 
Controller Service, multiple Processors can use the same
+Controller Service and the state can be exposed via the Controller Service's 
API.
+
+
+==== Unit Tests
+NiFi's Mock Framework provides an extensive collection of tools to perform 
unit testing of Processors. Processor unit tests typically
+begin with the `TestRunner` class. As a result, the `TestRunner` class 
contains a `getStateManager` method of its own. The StateManager
+that is returned, however, is of a specific type: `MockStateManager`. This 
implementation provides several methods in addition to those
+defined by the `StateManager` interface, that help developers to more easily 
develop unit tests.
+
+First, the `MockStateManager` implements the `StateManager` interface, so all 
of the state can be examined from within a unit test.
+Additionally, the `MockStateManager` exposes a handful of `assert*` methods to 
perform assertions that the State is set as expected.
+There are times, however, that state could be updated multiple times during 
the run of a single invocation of a Processor's `onTrigger`
+method. In this case, inspecting the values after running the Processor may 
not be sufficient. Additionally, we must always remember at each
+step to check the value of the stored state, which can become error-prone and 
burdensome for the developer.
+
+For these reasons, the `MockStateManager` provides an additional method, named 
`failIfStateSet`. This method instructs the State Manager that
+the unit test should immediately fail if the state for a given key is ever 
set, or if it is set to a specific value. The `doNotFailIfStateSet`
+method can then be used to instruct the Mock Framework to clear this state and 
allow state to be set to any value.
+
 
 
 === Reporting Processor Activity

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java 
b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
new file mode 100644
index 0000000..c014ed0
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.state;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.junit.Assert;
+
+
+public class MockStateManager implements StateManager {
+    private final AtomicInteger versionIndex = new AtomicInteger(0);
+
+    private StateMap localStateMap = new MockStateMap(null, -1L);
+    private StateMap clusterStateMap = new MockStateMap(null, -1L);
+
+    @Override
+    public synchronized void setState(final Map<String, String> state, final 
Scope scope) {
+        final StateMap stateMap = new MockStateMap(state, 
versionIndex.incrementAndGet());
+
+        if (scope == Scope.CLUSTER) {
+            clusterStateMap = stateMap;
+        } else {
+            localStateMap = stateMap;
+        }
+    }
+
+    @Override
+    public synchronized StateMap getState(final Scope scope) {
+        if (scope == Scope.CLUSTER) {
+            return clusterStateMap;
+        } else {
+            return localStateMap;
+        }
+    }
+
+    @Override
+    public synchronized boolean replace(final StateMap oldValue, final 
Map<String, String> newValue, final Scope scope) {
+        if (scope == Scope.CLUSTER) {
+            if (oldValue == clusterStateMap) {
+                clusterStateMap = new MockStateMap(newValue, 
versionIndex.incrementAndGet());
+                return true;
+            }
+
+            return false;
+        } else {
+            if (oldValue == localStateMap) {
+                localStateMap = new MockStateMap(newValue, 
versionIndex.incrementAndGet());
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    @Override
+    public synchronized void clear(final Scope scope) {
+        setState(Collections.<String, String> emptyMap(), scope);
+    }
+
+
+    private String getValue(final String key, final Scope scope) {
+        final StateMap stateMap = getState(scope);
+        return stateMap.get(key);
+    }
+
+    //
+    // assertion methods to make unit testing easier
+    //
+    /**
+     * Ensures that the state with the given key and scope is set to the given 
value, or else the test will fail
+     *
+     * @param key the state key
+     * @param value the expected value
+     * @param scope the scope
+     */
+    public void assertStateEquals(final String key, final String value, final 
Scope scope) {
+        Assert.assertEquals(value, getValue(key, scope));
+    }
+
+    /**
+     * Ensures that the state is equal to the given values
+     *
+     * @param stateValues the values expected
+     * @param scope the scope to compare the stateValues against
+     */
+    public void assertStateEquals(final Map<String, String> stateValues, final 
Scope scope) {
+        final StateMap stateMap = getState(scope);
+        Assert.assertEquals(stateValues, stateMap.toMap());
+    }
+
+    /**
+     * Ensures that the state is not equal to the given values
+     *
+     * @param stateValues the unexpected values
+     * @param scope the scope to compare the stateValues against
+     */
+    public void assertStateNotEquals(final Map<String, String> stateValues, 
final Scope scope) {
+        final StateMap stateMap = getState(scope);
+        Assert.assertNotSame(stateValues, stateMap.toMap());
+    }
+
+    /**
+     * Ensures that the state with the given key and scope is not set to the 
given value, or else the test will fail
+     *
+     * @param key the state key
+     * @param value the unexpected value
+     * @param scope the scope
+     */
+    public void assertStateNotEquals(final String key, final String value, 
final Scope scope) {
+        Assert.assertNotEquals(value, getValue(key, scope));
+    }
+
+    /**
+     * Ensures that some value is set for the given key and scope, or else the 
test will fail
+     *
+     * @param key the state key
+     * @param scope the scope
+     */
+    public void assertStateSet(final String key, final Scope scope) {
+        Assert.assertNotNull("Expected state to be set for key " + key + " and 
scope " + scope + ", but it was not set", getValue(key, scope));
+    }
+
+    /**
+     * Ensures that no value is set for the given key and scope, or else the 
test will fail
+     *
+     * @param key the state key
+     * @param scope the scope
+     */
+    public void assertStateNotSet(final String key, final Scope scope) {
+        Assert.assertNull("Expected state not to be set for key " + key + " 
and scope " + scope + ", but it was set", getValue(key, scope));
+    }
+
+    /**
+     * Ensures that the state was set for the given scope, regardless of what 
the value was.
+     *
+     * @param scope the scope
+     */
+    public void assertStateSet(final Scope scope) {
+        final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : 
localStateMap;
+        Assert.assertEquals("Expected state to be set for Scope " + scope + ", 
but it was not set", -1L, stateMap.getVersion());
+    }
+
+    /**
+     * Ensures that the state was not set for the given scope
+     * 
+     * @param scope the scope
+     */
+    public void assertStateNotSet(final Scope scope) {
+        final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : 
localStateMap;
+        Assert.assertNotSame("Expected state not to be set for Scope " + scope 
+ ", but it was set", -1L, stateMap.getVersion());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java 
b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java
new file mode 100644
index 0000000..cfce467
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.state;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.components.state.StateMap;
+
+public class MockStateMap implements StateMap {
+    private final Map<String, String> stateValues;
+    private final long version;
+
+    public MockStateMap(final Map<String, String> stateValues, final long 
version) {
+        this.stateValues = stateValues == null ? Collections.<String, String> 
emptyMap() : new HashMap<>(stateValues);
+        this.version = version;
+    }
+
+    @Override
+    public long getVersion() {
+        return version;
+    }
+
+    @Override
+    public String get(final String key) {
+        return stateValues.get(key);
+    }
+
+    @Override
+    public Map<String, String> toMap() {
+        return Collections.unmodifiableMap(stateValues);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
index bff1d62..6a6e4cf 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
@@ -16,23 +16,31 @@
  */
 package org.apache.nifi.util;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.state.MockStateManager;
 
 public class MockControllerServiceInitializationContext extends 
MockControllerServiceLookup implements ControllerServiceInitializationContext, 
ControllerServiceLookup {
 
     private final String identifier;
     private final ComponentLog logger;
+    private final StateManager stateManager;
 
     public MockControllerServiceInitializationContext(final ControllerService 
controllerService, final String identifier) {
-        this(controllerService, identifier, new MockProcessorLog(identifier, 
controllerService));
+        this(controllerService, identifier, new MockStateManager());
     }
 
-    public MockControllerServiceInitializationContext(final ControllerService 
controllerService, final String identifier, final ComponentLog logger) {
+    public MockControllerServiceInitializationContext(final ControllerService 
controllerService, final String identifier, final StateManager stateManager) {
+        this(controllerService, identifier, new MockProcessorLog(identifier, 
controllerService), stateManager);
+    }
+
+    public MockControllerServiceInitializationContext(final ControllerService 
controllerService, final String identifier, final ComponentLog logger, final 
StateManager stateManager) {
         this.identifier = identifier;
         this.logger = logger;
+        this.stateManager = stateManager;
         addControllerService(controllerService, identifier);
     }
 
@@ -55,4 +63,9 @@ public class MockControllerServiceInitializationContext 
extends MockControllerSe
     public ComponentLog getLogger() {
         return logger;
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index e8e4dd5..c641d24 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -34,17 +34,20 @@ import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SchedulingContext;
+import org.apache.nifi.state.MockStateManager;
 import org.junit.Assert;
 
 public class MockProcessContext extends MockControllerServiceLookup implements 
SchedulingContext, ControllerServiceLookup {
 
     private final ConfigurableComponent component;
     private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+    private final StateManager stateManager;
 
     private String annotationData = null;
     private boolean yieldCalled = false;
@@ -56,17 +59,22 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
     private volatile Set<Relationship> connections = new HashSet<>();
     private volatile Set<Relationship> unavailableRelationships = new 
HashSet<>();
 
+    public MockProcessContext(final ConfigurableComponent component) {
+        this(component, new MockStateManager());
+    }
+
     /**
      * Creates a new MockProcessContext for the given Processor
      *
      * @param component being mocked
      */
-    public MockProcessContext(final ConfigurableComponent component) {
+    public MockProcessContext(final ConfigurableComponent component, final 
StateManager stateManager) {
         this.component = Objects.requireNonNull(component);
+        this.stateManager = stateManager;
     }
 
-    public MockProcessContext(final ControllerService component, final 
MockProcessContext context) {
-        this(component);
+    public MockProcessContext(final ControllerService component, final 
MockProcessContext context, final StateManager stateManager) {
+        this(component, stateManager);
 
         try {
             annotationData = 
context.getControllerServiceAnnotationData(component);
@@ -121,7 +129,7 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
         requireNonNull(value, "Cannot set property to null value; if the 
intent is to remove the property, call removeProperty instead");
         final PropertyDescriptor fullyPopulatedDescriptor = 
component.getPropertyDescriptor(descriptor.getName());
 
-        final ValidationResult result = 
fullyPopulatedDescriptor.validate(value, new MockValidationContext(this));
+        final ValidationResult result = 
fullyPopulatedDescriptor.validate(value, new MockValidationContext(this, 
stateManager));
         String oldValue = properties.put(fullyPopulatedDescriptor, value);
         if (oldValue == null) {
             oldValue = fullyPopulatedDescriptor.getDefaultValue();
@@ -204,7 +212,7 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
      * non-null
      */
     public Collection<ValidationResult> validate() {
-        return component.validate(new MockValidationContext(this));
+        return component.validate(new MockValidationContext(this, 
stateManager));
     }
 
     public boolean isValid() {
@@ -342,4 +350,9 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
         final List<Range> elRanges = 
Query.extractExpressionRanges(getProperty(property).getValue());
         return (elRanges != null && !elRanges.isEmpty());
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
index 63a9876..33719ec 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.reporting.Bulletin;
@@ -37,11 +38,13 @@ public class MockReportingContext extends 
MockControllerServiceLookup implements
     private final Map<String, ControllerServiceConfiguration> 
controllerServices;
     private final MockEventAccess eventAccess = new MockEventAccess();
     private final Map<PropertyDescriptor, String> properties = new HashMap<>();
+    private final StateManager stateManager;
 
     private final Map<String, List<Bulletin>> componentBulletinsCreated = new 
HashMap<>();
 
-    public MockReportingContext(final Map<String, ControllerService> 
controllerServices) {
+    public MockReportingContext(final Map<String, ControllerService> 
controllerServices, final StateManager stateManager) {
         this.controllerServices = new HashMap<>();
+        this.stateManager = stateManager;
         for (final Map.Entry<String, ControllerService> entry : 
controllerServices.entrySet()) {
             this.controllerServices.put(entry.getKey(), new 
ControllerServiceConfiguration(entry.getValue()));
         }
@@ -112,4 +115,9 @@ public class MockReportingContext extends 
MockControllerServiceLookup implements
 
         return new ArrayList<>(created);
     }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
index d73a09b..6442778 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -27,6 +27,7 @@ import 
org.apache.nifi.attribute.expression.language.StandardExpressionLanguageC
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.expression.ExpressionLanguageCompiler;
@@ -35,9 +36,11 @@ public class MockValidationContext implements 
ValidationContext, ControllerServi
 
     private final MockProcessContext context;
     private final Map<String, Boolean> expressionLanguageSupported;
+    private final StateManager stateManager;
 
-    public MockValidationContext(final MockProcessContext processContext) {
+    public MockValidationContext(final MockProcessContext processContext, 
final StateManager stateManager) {
         this.context = processContext;
+        this.stateManager = stateManager;
 
         final Map<PropertyDescriptor, String> properties = 
processContext.getProperties();
         expressionLanguageSupported = new HashMap<>(properties.size());
@@ -63,8 +66,8 @@ public class MockValidationContext implements 
ValidationContext, ControllerServi
 
     @Override
     public ValidationContext getControllerServiceValidationContext(final 
ControllerService controllerService) {
-        final MockProcessContext serviceProcessContext = new 
MockProcessContext(controllerService, context);
-        return new MockValidationContext(serviceProcessContext);
+        final MockProcessContext serviceProcessContext = new 
MockProcessContext(controllerService, context, stateManager);
+        return new MockValidationContext(serviceProcessContext, stateManager);
     }
 
     @Override
@@ -118,6 +121,7 @@ public class MockValidationContext implements 
ValidationContext, ControllerServi
         return context.isControllerServiceEnabling(serviceIdentifier);
     }
 
+    @Override
     public boolean isExpressionLanguagePresent(final String value) {
         if (value == null) {
             return false;

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 925f0d8..8220632 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -69,6 +69,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
 import org.junit.Assert;
 
 public class StandardProcessorTestRunner implements TestRunner {
@@ -80,6 +81,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     private final SharedSessionState sharedState;
     private final AtomicLong idGenerator;
     private final boolean triggerSerially;
+    private final MockStateManager stateManager;
 
     private int numThreads = 1;
     private final AtomicInteger invocations = new AtomicInteger(0);
@@ -100,7 +102,8 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         this.sharedState = new SharedSessionState(processor, idGenerator);
         this.flowFileQueue = sharedState.getFlowFileQueue();
         this.sessionFactory = new MockSessionFactory(sharedState, processor);
-        this.context = new MockProcessContext(processor);
+        this.stateManager = new MockStateManager();
+        this.context = new MockProcessContext(processor, stateManager);
 
         detectDeprecatedAnnotations(processor);
 
@@ -575,7 +578,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         // }
 
         final ComponentLog logger = new MockProcessorLog(identifier, service);
-        final MockControllerServiceInitializationContext initContext = new 
MockControllerServiceInitializationContext(requireNonNull(service), 
requireNonNull(identifier), logger);
+        final MockControllerServiceInitializationContext initContext = new 
MockControllerServiceInitializationContext(requireNonNull(service), 
requireNonNull(identifier), logger, stateManager);
         initContext.addControllerServices(context);
         service.initialize(initContext);
 
@@ -595,7 +598,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void assertNotValid(final ControllerService service) {
-        final ValidationContext validationContext = new 
MockValidationContext(context).getControllerServiceValidationContext(service);
+        final ValidationContext validationContext = new 
MockValidationContext(context, 
stateManager).getControllerServiceValidationContext(service);
         final Collection<ValidationResult> results = 
context.getControllerService(service.getIdentifier()).validate(validationContext);
 
         for (final ValidationResult result : results) {
@@ -609,7 +612,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void assertValid(final ControllerService service) {
-        final ValidationContext validationContext = new 
MockValidationContext(context).getControllerServiceValidationContext(service);
+        final ValidationContext validationContext = new 
MockValidationContext(context, 
stateManager).getControllerServiceValidationContext(service);
         final Collection<ValidationResult> results = 
context.getControllerService(service.getIdentifier()).validate(validationContext);
 
         for (final ValidationResult result : results) {
@@ -719,7 +722,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         final Map<PropertyDescriptor, String> curProps = 
configuration.getProperties();
         final Map<PropertyDescriptor, String> updatedProps = new 
HashMap<>(curProps);
 
-        final ValidationContext validationContext = new 
MockValidationContext(context).getControllerServiceValidationContext(service);
+        final ValidationContext validationContext = new 
MockValidationContext(context, 
stateManager).getControllerServiceValidationContext(service);
         final ValidationResult validationResult = property.validate(value, 
validationContext);
 
         updatedProps.put(property, value);
@@ -768,4 +771,8 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         sharedState.clearProvenanceEvents();
     }
 
+    @Override
+    public MockStateManager getStateManager() {
+        return stateManager;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 6c8f192..378a92e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -35,6 +35,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
 
 public interface TestRunner {
 
@@ -824,4 +825,9 @@ public interface TestRunner {
      * Clears the Provenance Events that have been emitted by the Processor
      */
     void clearProvenanceEvents();
+
+    /**
+     * @return the State Provider that is used to stored and retrieve local 
state
+     */
+    MockStateManager getStateManager();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
index fbdea94..d48af63 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
@@ -123,4 +123,4 @@ public class TestMockProcessContext {
         public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
index 56216aa..c1cca26 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.documentation.mock;
 
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.logging.ComponentLog;
@@ -43,4 +44,9 @@ public class MockControllerServiceInitializationContext 
implements ControllerSer
         return new MockProcessorLogger();
     }
 
-}
+    @Override
+    public StateManager getStateManager() {
+        return null;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
index edf0475..1acdd49 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
@@ -102,4 +103,9 @@ public class MockProcessContext implements ProcessContext {
     public boolean isExpressionLanguagePresent(PropertyDescriptor property) {
         return false;
     }
-}
+
+    @Override
+    public StateManager getStateManager() {
+        return null;
+    }
+}
\ No newline at end of file

Reply via email to