NIFI-259: Initial implementation of State Management feature
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/57dadb72 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/57dadb72 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/57dadb72 Branch: refs/heads/NIFI-259 Commit: 57dadb7286c49d035df83ac565b0d72817469ba4 Parents: 0c68e2c Author: Mark Payne <[email protected]> Authored: Mon Jan 11 08:28:12 2016 -0500 Committer: Mark Payne <[email protected]> Committed: Mon Jan 11 08:28:32 2016 -0500 ---------------------------------------------------------------------- .../org/apache/nifi/components/state/Scope.java | 36 ++ .../nifi/components/state/StateManager.java | 84 +++++ .../apache/nifi/components/state/StateMap.java | 51 +++ .../nifi/components/state/StateProvider.java | 127 +++++++ .../StateProviderInitializationContext.java | 56 ++++ .../ControllerServiceInitializationContext.java | 6 + .../apache/nifi/processor/ProcessContext.java | 6 + .../apache/nifi/processor/ProcessSession.java | 1 + .../apache/nifi/reporting/ReportingContext.java | 6 + nifi-assembly/pom.xml | 6 + .../org/apache/nifi/util/NiFiProperties.java | 34 ++ .../client/socket/EndpointConnectionPool.java | 2 +- .../nifi/remote/cluster/NodeInformation.java | 16 +- .../remote/cluster/NodeInformationAdapter.java | 2 +- .../src/main/asciidoc/administration-guide.adoc | 90 +++++ .../src/main/asciidoc/developer-guide.adoc | 65 ++++ .../org/apache/nifi/state/MockStateManager.java | 172 ++++++++++ .../org/apache/nifi/state/MockStateMap.java | 49 +++ ...kControllerServiceInitializationContext.java | 17 +- .../apache/nifi/util/MockProcessContext.java | 23 +- .../apache/nifi/util/MockReportingContext.java | 10 +- .../apache/nifi/util/MockValidationContext.java | 10 +- .../nifi/util/StandardProcessorTestRunner.java | 17 +- .../java/org/apache/nifi/util/TestRunner.java | 6 + .../nifi/util/TestMockProcessContext.java | 2 +- ...kControllerServiceInitializationContext.java | 8 +- .../documentation/mock/MockProcessContext.java | 8 +- .../nifi/cluster/protocol/ClusterNodes.java | 39 +++ .../cluster/protocol/ConnectionResponse.java | 27 +- .../nifi/cluster/protocol/NodeIdentifier.java | 43 ++- .../jaxb/message/AdaptedConnectionResponse.java | 10 +- .../jaxb/message/AdaptedNodeIdentifier.java | 31 +- .../jaxb/message/ConnectionResponseAdapter.java | 8 +- .../jaxb/message/NodeIdentifierAdapter.java | 6 +- .../ClusterManagerProtocolSenderImplTest.java | 6 +- .../impl/NodeProtocolSenderImplTest.java | 9 +- .../org/apache/nifi/cluster/event/Event.java | 1 + .../cluster/event/impl/EventManagerImpl.java | 1 + .../impl/FileBasedClusterNodeFirewall.java | 3 +- .../cluster/manager/HttpClusterManager.java | 14 +- .../cluster/manager/HttpRequestReplicator.java | 3 +- .../cluster/manager/HttpResponseMapper.java | 1 + .../nifi/cluster/manager/NodeResponse.java | 7 +- .../exception/ConflictingNodeIdException.java | 46 +++ .../manager/impl/ClusteredEventAccess.java | 8 +- .../manager/impl/ClusteredReportingContext.java | 12 +- .../manager/impl/HttpRequestReplicatorImpl.java | 19 +- .../manager/impl/HttpResponseMapperImpl.java | 1 + .../cluster/manager/impl/WebClusterManager.java | 74 +++-- .../java/org/apache/nifi/cluster/node/Node.java | 1 - ...anagerProtocolServiceLocatorFactoryBean.java | 1 - ...FileBasedClusterNodeFirewallFactoryBean.java | 1 + .../reporting/ClusteredReportingTaskNode.java | 7 +- .../event/impl/EventManagerImplTest.java | 8 +- .../impl/FileBasedClusterNodeFirewallTest.java | 8 +- .../impl/DataFlowManagementServiceImplTest.java | 27 +- .../impl/HttpRequestReplicatorImplTest.java | 38 ++- .../impl/HttpResponseMapperImplTest.java | 21 +- .../manager/impl/TestWebClusterManager.java | 2 - .../cluster/manager/testutils/HttpRequest.java | 2 + .../cluster/manager/testutils/HttpResponse.java | 1 + .../cluster/manager/testutils/HttpServer.java | 1 + .../components/state/StateManagerProvider.java | 60 ++++ .../controller/AbstractConfiguredComponent.java | 20 +- .../nifi/controller/ConfiguredComponent.java | 14 +- .../nifi-framework/nifi-framework-core/pom.xml | 48 ++- .../apache/nifi/cluster/HeartbeatPayload.java | 18 - .../apache/nifi/controller/FlowController.java | 136 +++++--- .../nifi/controller/StandardFlowService.java | 50 ++- .../controller/StandardFlowSynchronizer.java | 16 +- .../reporting/AbstractReportingTaskNode.java | 8 +- .../reporting/StandardReportingContext.java | 10 +- .../reporting/StandardReportingTaskNode.java | 2 +- .../repository/BatchingSessionFactory.java | 4 +- .../scheduling/ConnectableProcessContext.java | 10 +- .../scheduling/EventDrivenSchedulingAgent.java | 21 +- .../scheduling/QuartzSchedulingAgent.java | 10 +- .../controller/scheduling/ScheduleState.java | 10 +- .../scheduling/StandardProcessScheduler.java | 31 +- .../scheduling/TimerDrivenSchedulingAgent.java | 9 +- .../service/ControllerServiceLoader.java | 4 +- ...dControllerServiceInitializationContext.java | 10 +- .../service/StandardControllerServiceNode.java | 8 +- .../StandardControllerServiceProvider.java | 16 +- .../nifi/controller/state/ClusterState.java | 38 +++ .../controller/state/ConfigParseException.java | 30 ++ .../nifi/controller/state/NodeDescription.java | 39 +++ .../controller/state/StandardStateManager.java | 73 ++++ .../nifi/controller/state/StandardStateMap.java | 49 +++ ...ndardStateProviderInitializationContext.java | 60 ++++ .../nifi/controller/state/StateMapSerDe.java | 103 ++++++ .../nifi/controller/state/StateMapUpdate.java | 45 +++ .../state/StateProviderException.java | 30 ++ .../state/config/StateManagerConfiguration.java | 142 ++++++++ .../config/StateProviderConfiguration.java | 51 +++ .../state/config/StateProviderScope.java | 30 ++ .../manager/StandardStateManagerProvider.java | 295 +++++++++++++++++ .../state/providers/AbstractStateProvider.java | 58 ++++ .../local/WriteAheadLocalStateProvider.java | 224 +++++++++++++ .../zookeeper/ZooKeeperStateProvider.java | 330 +++++++++++++++++++ .../state/server/ZooKeeperStateServer.java | 77 +++++ .../nifi/groups/StandardProcessGroup.java | 20 +- .../nifi/processor/StandardProcessContext.java | 10 +- .../processor/StandardSchedulingContext.java | 10 +- ...g.apache.nifi.components.state.StateProvider | 16 + .../TestStandardProcessScheduler.java | 24 +- .../StandardControllerServiceProviderTest.java | 28 +- .../TestStandardControllerServiceProvider.java | 78 +++-- .../providers/AbstractTestStateProvider.java | 153 +++++++++ .../local/TestWriteAheadLocalStateProvider.java | 90 +++++ .../zookeeper/TestZooKeeperStateProvider.java | 106 ++++++ .../src/main/resources/conf/nifi.properties | 14 + .../main/resources/conf/state-management.xml | 52 +++ .../main/resources/conf/zookeeper.properties | 30 ++ .../socket/ClusterManagerServerProtocol.java | 2 +- .../dao/impl/StandardControllerServiceDAO.java | 4 +- .../nifi/web/dao/impl/StandardProcessorDAO.java | 4 +- .../web/dao/impl/StandardReportingTaskDAO.java | 4 +- .../resources/access-control/nifi.properties | 7 + .../access-control/state-management.xml | 38 +++ .../processors/hl7/ExtractHL7Attributes.java | 3 +- .../apache/nifi/processors/hl7/RouteHL7.java | 2 +- .../processors/kafka/test/EmbeddedKafka.java | 3 +- .../standard/AbstractListProcessor.java | 288 ++++++++-------- .../nifi/processors/standard/GetHTTP.java | 122 ++----- .../standard/TestAbstractListProcessor.java | 34 +- .../standard/TestDetectDuplicate.java | 42 +-- .../nifi/processors/standard/TestGetHTTP.java | 108 +----- .../standard/TestRouteOnAttribute.java | 5 +- pom.xml | 121 +++++-- 130 files changed, 4209 insertions(+), 795 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java new file mode 100644 index 0000000..8daf12a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +/** + * A Scope represents how a NiFi component's state is to be stored and retrieved when running in a cluster. + */ +public enum Scope { + + /** + * State is to be treated as "global" across the cluster. I.e., the same component on all nodes will + * have access to the same state. + */ + CLUSTER, + + /** + * State is to be treated local to the node. I.e., the same component will have different state on each + * node in the cluster. + */ + LOCAL; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java new file mode 100644 index 0000000..f52b2e3 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import java.io.IOException; +import java.util.Map; + +/** + * <p> + * The StateManager is responsible for providing NiFi components a mechanism for storing + * and retrieving state. + * </p> + * + * <p> + * When calling methods in this class, the {@link Scope} is used in order to specify whether + * state should be stored/retrieved from the local state or the clustered state. However, if + * any instance of NiFi is not clustered (or is disconnected from its cluster), the Scope is + * not really relevant and the local state will be used in all cases. This allows component + * developers to not concern themselves with whether or not a particular instance of NiFi is + * clustered. Instead, developers should assume that the instance is indeed clustered and write + * the component accordingly. If not clustered, the component will still behavior in the same + * manner, as a standalone node could be thought of as a "cluster of 1." + * </p> + */ +public interface StateManager { + + /** + * Updates the value of the component's state, setting it to given value + * + * @param state the value to change the state to + * @param scope the scope to use when storing the state + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + void setState(Map<String, String> state, Scope scope) throws IOException; + + /** + * Returns the current state for the component. This return value will never be <code>null</code>. + * If the state has not yet been set, the StateMap's version will be -1, and the map of values will be empty. + * + * @param scope the scope to use when fetching the state + * @return + * @throws IOException + */ + StateMap getState(Scope scope) throws IOException; + + /** + * Updates the value of the component's state to the new value if and only if the value currently + * is the same as the given oldValue. + * + * @param oldValue the old value to compare against + * @param newValue the new value to use if and only if the state's value is the same as the given oldValue + * @param scope the scope to use for storing the new state + * @return <code>true</code> if the state was updated to the new value, <code>false</code> if the state's value was not + * equal to oldValue + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) throws IOException; + + /** + * Clears all keys and values from the component's state + * + * @param scope the scope whose values should be cleared + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + void clear(Scope scope) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java new file mode 100644 index 0000000..7984e42 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateMap.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import java.util.Map; + +/** + * Provides a representation of a component's state at some point in time. + */ +public interface StateMap { + /** + * Each time that a component's state is updated, the state is assigned a new version. + * This version can then be used to atomically update state by the backing storage mechanism. + * Though this number is monotonically increasing, it should not be expected to increment always + * from X to X+1. I.e., version numbers may be skipped. + * + * @return the version associated with the state + */ + long getVersion(); + + /** + * Returns the value associated with the given key + * + * @param key the key whose value should be retrieved + * @return the value associated with the given key, or <code>null</code> if no value is associated + * with this key. + */ + String get(String key); + + /** + * Returns an immutable Map representation of all keys and values for the state of a component. + * + * @return an immutable Map representation of all keys and values for the state of a component. + */ + Map<String, String> toMap(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java new file mode 100644 index 0000000..e0243f3 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.components.ConfigurableComponent; + +/** + * <p> + * Provides a mechanism by which components can store and retrieve state. Depending on the Provider, the state + * may be stored locally, or it may be stored on a remote resource. + * </p> + * + * <p> + * Which implementation should be used for local and clustered state is configured in the NiFi properties file. + * It is therefore possible to provide custom implementations of this interface. Note, however, that this interface + * is new as of version 0.5.0 of Apache NiFi and may not be considered "stable" as of yet. Therefore, it is subject + * to change without notice, so providing custom implementations is cautioned against until the API becomes more stable. + * </p> + * + * @since 0.5.0 + */ +public interface StateProvider extends ConfigurableComponent { + + /** + * Initializes the StateProvider so that it is capable of being used. This method will be called + * once before any of the other methods are called and will not be called again until the {@link #shutdown()} + * method has been called + * + * @param context the initialization context that can be used to prepare the state provider for use + */ + void initialize(StateProviderInitializationContext context) throws IOException; + + /** + * Shuts down the StateProvider and cleans up any resources held by it. Once this method has returned, the + * StateProvider may be initialized once again via the {@link #initialize(StateProviderInitializationContext)} method. + */ + void shutdown(); + + /** + * Updates the value of the component's state, setting the new value to the + * given state + * + * @param state the value to change the state to + * @param componentId the id of the component for which state is being set + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + void setState(Map<String, String> state, String componentId) throws IOException; + + + /** + * Returns the currently configured state for the component. The returned StateMap will never be null. + * The version of the StateMap will be -1 and the state will contain no key/value pairs if the state has never been set. + * + * @param componentId the id of the component for which state is to be retrieved + * @return the currently configured value for the component's state + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + StateMap getState(String componentId) throws IOException; + + + /** + * Updates the value of the component's state to the new value if and only if the value currently + * is the same as the given oldValue. + * + * @param oldValue the old value to compare against + * @param newValue the new value to use if and only if the state's value is the same as the given oldValue + * @param componentId the id of the component for which state is being retrieved + * @return <code>true</code> if the state was updated to the new value, <code>false</code> if the state's value was not + * equal to oldValue + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException; + + /** + * Removes all values from the component's state that is stored using the given scope + * + * @param componentId the id of the component for which state is being cleared + * + * @throws IOException if unable to communicate with the underlying storage mechanism + */ + void clear(String componentId) throws IOException; + + /** + * This method is called whenever a component is removed from the NiFi instance. This allows the State Provider to + * perform tasks when a component is removed in order to clean up resources that may be associated with that component + * + * @param componentId the ID of the component that was added to the NiFi instance + * @throws IOException if unable to perform the necessary cleanup + */ + void onComponentRemoved(String componentId) throws IOException; + + /** + * Notifies the state provider that it should begin servicing requests to store and retrieve state + */ + void enable(); + + /** + * Notifies the state provider that it should stop servicing requests to store and retrieve state and instead throw a ProviderDisabledException if any request is made to do so + */ + void disable(); + + /** + * @return <code>true</code> if the provider is enabled, <code>false</code> otherwise. + */ + boolean isEnabled(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java new file mode 100644 index 0000000..aaf5490 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +import java.util.Map; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; + +/** + * This interface defines an initialization context that is passed to a {@link StateProvider} when it + * is initialized. + */ +public interface StateProviderInitializationContext { + /** + * @return the identifier if the StateProvider + */ + String getIdentifier(); + + /** + * @return a Map of Property Descriptors to their configured values + */ + Map<PropertyDescriptor, PropertyValue> getProperties(); + + /** + * Returns the configured value for the given property + * + * @param property the property to retrieve the value for + * + * @return the configured value for the property. + */ + PropertyValue getProperty(PropertyDescriptor property); + + /** + * @return the SSL Context that should be used to communicate with remote resources, + * or <code>null</code> if no SSLContext has been configured + */ + SSLContext getSSLContext(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java index 6fcee0c..3486621 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.logging.ComponentLog; public interface ControllerServiceInitializationContext { @@ -37,4 +38,9 @@ public interface ControllerServiceInitializationContext { * way and generate bulletins when appropriate */ ComponentLog getLogger(); + + /** + * @return the StateManager that can be used to store and retrieve state for this component + */ + StateManager getStateManager(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index cf1bb6c..91ea1a2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceLookup; /** @@ -154,4 +155,9 @@ public interface ProcessContext { * does not allow the Expression Language, even if a seemingly valid Expression is present in the value. */ boolean isExpressionLanguagePresent(PropertyDescriptor property); + + /** + * @return the StateManager that can be used to store and retrieve state for this component + */ + StateManager getStateManager(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java index d0c5b46..5d867b7 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessSession.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.exception.FlowFileAccessException; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java index 281194c..cb131e2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceLookup; /** @@ -86,4 +87,9 @@ public interface ReportingContext { * Controller Services */ ControllerServiceLookup getControllerServiceLookup(); + + /** + * @return the StateManager that can be used to store and retrieve state for this component + */ + StateManager getStateManager(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index b8e83bd..1b668a5 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -283,6 +283,12 @@ language governing permissions and limitations under the License. --> <nifi.templates.directory>./conf/templates</nifi.templates.directory> <nifi.database.directory>./database_repository</nifi.database.directory> + <nifi.state.management.configuration.file>./conf/state-management.xml</nifi.state.management.configuration.file> + <nifi.state.management.embedded.zookeeper.start>false</nifi.state.management.embedded.zookeeper.start> + <nifi.state.management.embedded.zookeeper.properties>./conf/zookeeper.properties</nifi.state.management.embedded.zookeeper.properties> + <nifi.state.management.provider.local>local-provider</nifi.state.management.provider.local> + <nifi.state.management.provider.cluster>zk-provider</nifi.state.management.provider.cluster> + <nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation> <nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory> <nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions> http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index c82a220..5b35328 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -189,6 +189,13 @@ public class NiFiProperties extends Properties { // kerberos properties public static final String KERBEROS_KRB5_FILE = "nifi.kerberos.krb5.file"; + // state management + public static final String STATE_MANAGEMENT_CONFIG_FILE = "nifi.state.management.configuration.file"; + public static final String STATE_MANAGEMENT_LOCAL_PROVIDER_ID = "nifi.state.management.provider.local"; + public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_ID = "nifi.state.management.provider.cluster"; + public static final String STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER = "nifi.state.management.embedded.zookeeper.start"; + public static final String STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES = "nifi.state.management.embedded.zookeeper.properties"; + // defaults public static final String DEFAULT_TITLE = "NiFi"; public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; @@ -236,6 +243,9 @@ public class NiFiProperties extends Properties { public static final int DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS = 10; public static final String DEFAULT_CLUSTER_MANAGER_SAFEMODE_DURATION = "0 sec"; + // state management defaults + public static final String DEFAULT_STATE_MANAGEMENT_CONFIG_FILE = "conf/state-management.xml"; + private NiFiProperties() { super(); } @@ -985,4 +995,28 @@ public class NiFiProperties extends Properties { public String getBoredYieldDuration() { return getProperty(BORED_YIELD_DURATION, DEFAULT_BORED_YIELD_DURATION); } + + public File getStateManagementConfigFile() { + return new File(getProperty(STATE_MANAGEMENT_CONFIG_FILE, DEFAULT_STATE_MANAGEMENT_CONFIG_FILE)); + } + + /* + * public static final String STATE_MANAGEMENT_MAX_ZOOKEEPER_SERVERS = "nifi.state.management.embedded.zookeeper.max.instances"; + */ + public String getLocalStateProviderId() { + return getProperty(STATE_MANAGEMENT_LOCAL_PROVIDER_ID); + } + + public String getClusterStateProviderId() { + return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID); + } + + public File getEmbeddedZooKeeperPropertiesFile() { + final String filename = getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES); + return filename == null ? null : new File(filename); + } + + public boolean isStartEmbeddedZooKeeper() { + return Boolean.parseBoolean(getProperty(STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER)); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 18075db..81ca70d 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -761,7 +761,7 @@ public class EndpointConnectionPool { final int index = n % destinations.size(); PeerStatus status = destinations.get(index); if (status == null) { - final PeerDescription description = new PeerDescription(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure()); + final PeerDescription description = new PeerDescription(nodeInfo.getSiteToSiteHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure()); status = new PeerStatus(description, nodeInfo.getTotalFlowFiles()); destinations.set(index, status); break; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java index 2041268..abfcc85 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java @@ -18,23 +18,23 @@ package org.apache.nifi.remote.cluster; public class NodeInformation { - private final String hostname; + private final String siteToSiteHostname; private final Integer siteToSitePort; private final int apiPort; private final boolean isSiteToSiteSecure; private final int totalFlowFiles; - public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort, + public NodeInformation(final String siteToSiteHostname, final Integer siteToSitePort, final int apiPort, final boolean isSiteToSiteSecure, final int totalFlowFiles) { - this.hostname = hostname; + this.siteToSiteHostname = siteToSiteHostname; this.siteToSitePort = siteToSitePort; this.apiPort = apiPort; this.isSiteToSiteSecure = isSiteToSiteSecure; this.totalFlowFiles = totalFlowFiles; } - public String getHostname() { - return hostname; + public String getSiteToSiteHostname() { + return siteToSiteHostname; } public int getAPIPort() { @@ -66,7 +66,7 @@ public class NodeInformation { } final NodeInformation other = (NodeInformation) obj; - if (!hostname.equals(other.hostname)) { + if (!siteToSiteHostname.equals(other.siteToSiteHostname)) { return false; } if (siteToSitePort == null && other.siteToSitePort != null) { @@ -88,11 +88,11 @@ public class NodeInformation { @Override public int hashCode() { - return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); + return 83832 + siteToSiteHostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); } @Override public String toString() { - return "Node[" + hostname + ":" + apiPort + "]"; + return "Node[" + siteToSiteHostname + ":" + apiPort + "]"; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java index 440463c..b2dead0 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java @@ -30,7 +30,7 @@ public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, N @Override public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception { final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); - adapted.setHostname(nodeInformation.getHostname()); + adapted.setHostname(nodeInformation.getSiteToSiteHostname()); adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); adapted.setApiPort(nodeInformation.getAPIPort()); adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 830d542..9d1a442 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -367,6 +367,78 @@ in the remote cluster can be included in the same group. When the ADMIN wants to cluster, s/he can grant it to the group and avoid having to grant it individually to each node in the cluster. + +[[state_management]] +State Management +---------------- + +NiFi provides a mechanism for Processors, Reporting Tasks, Controller Services, and the framework itself to persist state. This +allows a Processor, for example, to resume from the place where it left off after NiFi is restarted. Additionally, it allows for +a Processor to store some piece of information so that the Processor can access that information from all of the different nodes +in the cluster. This allows one node to pick up where another node left off, or to coordinate across all of the nodes in a cluster. + +[[state_providers]] +*Configuring State Providers* +When a component decides to store or retrieve state, it does so by providing a "Scope" - either Node-local or Cluster-wide. The +mechanism that is used to store and retrieve this state is then determined based on this Scope, as well as the configured State +Providers. The _nifi.properties_ file contains three different properties that are relevant to configuring these State Providers. +The first is the `nifi.state.management.configuration.file` property specifies an external XML file that is used for configuring +the local and cluster-wide State Providers. This XML file may contain configurations for multiple providers, so the +`nifi.state.management.provider.local` property provides the identifier of the local State Provider configured in this XML file. +Similarly, the `nifi.state.management.provider.cluster` property provides the identifier of the cluster-wide State Provider +configured in this XML file. + +This XML file consists of a top-level `state-management` element, which has one or more `local-provider` and zero or more +`cluster-provider` elements. Each of these elements then contains an `id` element that is used to specify the identifier that can +be referenced in the _nifi.properties_ file, as well as a `class` element that specifies the fully-qualified class name to use +in order to instantiate the State Provider. Finally, each of these elements may have zero or more `property` elements. Each +`property` element has an attribute, `name` that is the name of the property that the State Provider supports. The textual content +of the `property` element is the value of the property. + +Once these State Providers have been configured in the _state-management.xml_ file (or whatever file is configured), those Providers +may be referenced by their identifiers. By default, the Local State Provider is configured to be a `WriteAheadLocalStateProvider` that +persists the data to the _$NIFI_HOME/state_ directory. The default Cluster State Provider is configured to be a `ZooKeeperStateProvider`. +The default ZooKeeper-based provider must have its `Connect String` property populated before it can be used. It is also advisable, +if multiple NiFi instances will use the same ZooKeeper instance, that the value of the `Root Node` property be changed. For instance, +one might set the value to `/nifi/<team name>/production`. + +If NiFi is configured to run in a standalone mode, the `cluster-state-provider` element need not be populated in the _state-management.xml_ +file and will actually be ignored if they are populated. However, the `local-state-provider` element must always be present and populated. +Additionally, if NiFi is run in a cluster, each node must also have the `cluster-state-provider` element present and properly configured. Otherwise, +NiFi will fail to startup. + +While there are not many properties that need to be configured for these providers, they were externalized into a separate _state-providers.xml_ +file, rather than being configured via the _nifi.properties_ file, simply because different implementations may require different properties, +and it is easier to maintain and understand the configuration in an XML-based file such as this, than to mix the properties of the Provider +in with all of the other NiFi framework-specific properties. + + +[[embedded_zookeeper]] +*Embedded ZooKeeper Server* +As mentioned above, the default State Provider for cluster-wide state is the `ZooKeeperStateProvider`. At the time of this writing, this is the +only State Provider that exists for handling cluster-wide state. What this means is that NiFi has a dependencies on ZooKeeper in order to +behave as a cluster. However, there are many environments in which NiFi is deployed where there is no existing ZooKeeper ensemble being maintained. +In order to avoid the burden of forcing administrators to also maintain a separate ZooKeeper instance, NiFi provides the option of starting an +embedded ZooKeeper server. + +This can be accomplished by setting the `nifi.state.management.embedded.zookeeper.start` property in _nifi.properties_ to `true` on those nodes +that should run the embedded ZooKeeper server. Generally, it is advisable to run ZooKeeper on either 3 or 5 nodes. Running on fewer than 3 nodes +provides less durability in the face of failure. Running on more than 5 nodes generally produces more network traffic than is necessary. Additionally, +running ZooKeeper on 4 nodes provides no more benefit than running on 3 nodes, ZooKeeper requires a majority of nodes be active in order to function. +However, it is up to the administrator to determine the number of nodes most appropriate to the particular deployment of NiFi. + +If the `nifi.state.management.embedded.zookeeper.start` property is set to `true`, the `nifi.state.management.embedded.zookeeper.properties` property +in _nifi.properties_ also becomes relevant. This specifies the ZooKeeper properties file to use. At a minimum, This properties file needs to be populated +with the list of ZooKeeper servers. Each of these servers is configured as <hostname>:<client port>[:<leader election port>]. For example, `myhost:2888:3888`. +This list of nodes should be the same nodes in the NiFi cluster that have the `nifi.state.management.embedded.zookeeper.start` +property set to `true`. Also note that because ZooKeeper will be listening on these ports, the firewall may need to be configured to open these ports +for incoming traffic, at least between nodes in the cluster. + +For more information on the properties used to administer ZooKeeper, see the +link:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html[ZooKeeper Admin Guide]. + + + [[clustering]] Clustering Configuration ------------------------ @@ -433,6 +505,7 @@ For the NCM, the minimum properties to configure are as follows: For Node 1, the minimum properties to configure are as follows: * Under the Web Properties, set either the http or https port that you want Node 1 to run on. If the NCM is running on the same server, choose a different web port for Node 1. Also, consider whether you need to set the http or https host property. +* Under the State Management section, set the `nifi.state.management.provider.cluster` property to the identifier of the Cluster State Provider. Ensure that the Cluster State Provider has been configured in the _state-management.xml_ file. See <<state_providers>> for more information. * Under Cluster Node Properties, set the following: ** nifi.cluster.is.node - Set this to _true_. ** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost". @@ -443,6 +516,7 @@ For Node 1, the minimum properties to configure are as follows: For Node 2, the minimum properties to configure are as follows: * Under the Web Properties, set either the http or https port that you want Node 2 to run on. Also, consider whether you need to set the http or https host property. +* Under the State Management section, set the `nifi.state.management.provider.cluster` property to the identifier of the Cluster State Provider. Ensure that the Cluster State Provider has been configured in the _state-management.xml_ file. See <<state_providers>> for more information. * Under the Cluster Node Properties, set the following: ** nifi.cluster.is.node - Set this to _true_. ** nifi.cluster.node.address - Set this to the fully qualified hostname of the node. If left blank, it defaults to "localhost". @@ -631,6 +705,22 @@ only consider if `nifi.security.user.login.identity.provider` configured with a |nifi.documentation.working.directory|The documentation working directory. The default value is ./work/docs/components and probably should be left as is. |==== + +*State Management* + + +The State Management section of the Properties file provides a mechanism for configuring local and cluster-wide mechanisms +for components to persist state. See the <<state_management>> section for more information on how this is used. + +|==== +|*Property*|*Description* +|nifi.state.management.configuration.file|The XML file that contains configuration for the local and cluster-wide State Providers. The default value is _./conf/state-management.xml_ +|nifi.state.management.provider.local|The ID of the Local State Provider to use. This value must match the value of the `id` element of one of the `local-provider` elements in the _state-management.xml_ file. +|nifi.state.management.provider.cluster|The ID of the Cluster State Provider to use. This value must match the value of the `id` element of one of the `cluster-provider` elements in the _state-management.xml_ file. This value is ignored if not clustered but is required for nodes in a cluster. +|nifi.state.management.embedded.zookeeper.start|Specifies whether or not this instance of NiFi should start an embedded ZooKeeper Server. This is used in conjunction with the ZooKeeperStateProvider. +|nifi.state.management.embedded.zookeeper.properties|Specifies a properties file that contains the configuration for the embedded ZooKeeper Server that is started (if the `|nifi.state.management.embedded.zookeeper.start` property is set to `true`) +||==== + + *H2 Settings* + The H2 Settings section defines the settings for the H2 database, which keeps track of user access and flow controller history. http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-docs/src/main/asciidoc/developer-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc index aeea3dc..f9bf675 100644 --- a/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -216,6 +216,13 @@ class, calling the appropriate methods to fill in the details of the Relationship, and finally calling the `build` method. +[[state_manager]] +==== StateManager +The StateManager provides Processors, Reporting Tasks, and Controller Services a mechanism +for easily storing and retrieving state. The API is similar to that of ConcurrentHashMap +but requires a Scope for each operation. The Scope indicates whether the state is to be +retrieved/stored locally or in a cluster-wide manner. For more information, see the +<<state_manager>> section. [[processor_initialization_context]] ==== ProcessorInitializationContext @@ -558,6 +565,64 @@ for instance, they should not be relied upon for critical business logic. +[[state_manager]] +=== State Manager + +From the ProcessContext, ReportingContext, and ControllerServiceInitializationContext, components are +able to call the `getStateManager()` method. This State Manager is responsible for providing a simple API +for storing and retrieving state. As such, the API is designed to be quite similar to the ConcurrentMap +API, which most Java developers are already familiar with. + + +[[state_scope]] +==== Scope +One very notable difference between the StateManager API and the ConcurrentMap API, however, is the presence +of a Scope object on each method call of the StateManager. This Scope will either be `Scope.NODE` or `Scope.CLUSTER`. +If NiFi is run in a cluster, this Scope provides important information to the framework about how the operation should +occur. + +If state as stored using `Scope.CLUSTER`, then all nodes in the cluster will be communicating with the same +state storage mechanism, as if all nodes were to share a single ConcurrentMap. If state is stored and retrieved using +`Scope.NODE`, then each node will see a different representation of the state. + +It is also worth noting that if NiFi is configured to run as a standalone instance, rather than running in a cluster, +a scope of `Scope.NODE` is always used. This is done in order to allow the developer of a NiFi component to write the code +in one consistent way, without worrying about whether or not the NiFi instance is clustered. The developer should instead assume +that the instance is clustered and write the code accordingly. + + +==== Storing and Retrieving State + +State is stored using the StateManager's `set`, `replace`, `putIfAbsent`, `remove`, and `clear` methods. All of these methods, +with the exception of `clear` take as the first argument the key to be set. The key that is used is unique only to the same +instance of the component and for the same Scope. That is, if two Processors store a value using the key _My Key_, those Processors +will not conflict with each other, even if both Processors are of the same type (e.g., both are of type ListFile). Furthermore, +if a Processor stores a value with the key of _My Key_ using the `Scope.CLUSTER` scope, and then attempts to retrieve the value +using the `Scope.NODE` scope, the value retrieved will be `null`. Each Processor's state, then, is stored in isolation from other +Processor's state. A unique key can be thought of as a triple of <Processor Instance, Key, Scope>. + +It follows, then, that two Processors cannot share the same state. There are, however, some circumstances in which it is very +necessary to share state between two Processors of different types, or two Processors of the same type. This can be accomplished +by using a Controller Service. By storing and retrieving state from a Controller Service, multiple Processors can use the same +Controller Service and the state can be exposed via the Controller Service's API. + + +==== Unit Tests +NiFi's Mock Framework provides an extensive collection of tools to perform unit testing of Processors. Processor unit tests typically +begin with the `TestRunner` class. As a result, the `TestRunner` class contains a `getStateManager` method of its own. The StateManager +that is returned, however, is of a specific type: `MockStateManager`. This implementation provides several methods in addition to those +defined by the `StateManager` interface, that help developers to more easily develop unit tests. + +First, the `MockStateManager` implements the `StateManager` interface, so all of the state can be examined from within a unit test. +Additionally, the `MockStateManager` exposes a handful of `assert*` methods to perform assertions that the State is set as expected. +There are times, however, that state could be updated multiple times during the run of a single invocation of a Processor's `onTrigger` +method. In this case, inspecting the values after running the Processor may not be sufficient. Additionally, we must always remember at each +step to check the value of the stored state, which can become error-prone and burdensome for the developer. + +For these reasons, the `MockStateManager` provides an additional method, named `failIfStateSet`. This method instructs the State Manager that +the unit test should immediately fail if the state for a given key is ever set, or if it is set to a specific value. The `doNotFailIfStateSet` +method can then be used to instruct the Mock Framework to clear this state and allow state to be set to any value. + === Reporting Processor Activity http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java new file mode 100644 index 0000000..c014ed0 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.state; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.junit.Assert; + + +public class MockStateManager implements StateManager { + private final AtomicInteger versionIndex = new AtomicInteger(0); + + private StateMap localStateMap = new MockStateMap(null, -1L); + private StateMap clusterStateMap = new MockStateMap(null, -1L); + + @Override + public synchronized void setState(final Map<String, String> state, final Scope scope) { + final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet()); + + if (scope == Scope.CLUSTER) { + clusterStateMap = stateMap; + } else { + localStateMap = stateMap; + } + } + + @Override + public synchronized StateMap getState(final Scope scope) { + if (scope == Scope.CLUSTER) { + return clusterStateMap; + } else { + return localStateMap; + } + } + + @Override + public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) { + if (scope == Scope.CLUSTER) { + if (oldValue == clusterStateMap) { + clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); + return true; + } + + return false; + } else { + if (oldValue == localStateMap) { + localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); + return true; + } + + return false; + } + } + + @Override + public synchronized void clear(final Scope scope) { + setState(Collections.<String, String> emptyMap(), scope); + } + + + private String getValue(final String key, final Scope scope) { + final StateMap stateMap = getState(scope); + return stateMap.get(key); + } + + // + // assertion methods to make unit testing easier + // + /** + * Ensures that the state with the given key and scope is set to the given value, or else the test will fail + * + * @param key the state key + * @param value the expected value + * @param scope the scope + */ + public void assertStateEquals(final String key, final String value, final Scope scope) { + Assert.assertEquals(value, getValue(key, scope)); + } + + /** + * Ensures that the state is equal to the given values + * + * @param stateValues the values expected + * @param scope the scope to compare the stateValues against + */ + public void assertStateEquals(final Map<String, String> stateValues, final Scope scope) { + final StateMap stateMap = getState(scope); + Assert.assertEquals(stateValues, stateMap.toMap()); + } + + /** + * Ensures that the state is not equal to the given values + * + * @param stateValues the unexpected values + * @param scope the scope to compare the stateValues against + */ + public void assertStateNotEquals(final Map<String, String> stateValues, final Scope scope) { + final StateMap stateMap = getState(scope); + Assert.assertNotSame(stateValues, stateMap.toMap()); + } + + /** + * Ensures that the state with the given key and scope is not set to the given value, or else the test will fail + * + * @param key the state key + * @param value the unexpected value + * @param scope the scope + */ + public void assertStateNotEquals(final String key, final String value, final Scope scope) { + Assert.assertNotEquals(value, getValue(key, scope)); + } + + /** + * Ensures that some value is set for the given key and scope, or else the test will fail + * + * @param key the state key + * @param scope the scope + */ + public void assertStateSet(final String key, final Scope scope) { + Assert.assertNotNull("Expected state to be set for key " + key + " and scope " + scope + ", but it was not set", getValue(key, scope)); + } + + /** + * Ensures that no value is set for the given key and scope, or else the test will fail + * + * @param key the state key + * @param scope the scope + */ + public void assertStateNotSet(final String key, final Scope scope) { + Assert.assertNull("Expected state not to be set for key " + key + " and scope " + scope + ", but it was set", getValue(key, scope)); + } + + /** + * Ensures that the state was set for the given scope, regardless of what the value was. + * + * @param scope the scope + */ + public void assertStateSet(final Scope scope) { + final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; + Assert.assertEquals("Expected state to be set for Scope " + scope + ", but it was not set", -1L, stateMap.getVersion()); + } + + /** + * Ensures that the state was not set for the given scope + * + * @param scope the scope + */ + public void assertStateNotSet(final Scope scope) { + final StateMap stateMap = (scope == Scope.CLUSTER) ? clusterStateMap : localStateMap; + Assert.assertNotSame("Expected state not to be set for Scope " + scope + ", but it was set", -1L, stateMap.getVersion()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java new file mode 100644 index 0000000..cfce467 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateMap.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.state; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.state.StateMap; + +public class MockStateMap implements StateMap { + private final Map<String, String> stateValues; + private final long version; + + public MockStateMap(final Map<String, String> stateValues, final long version) { + this.stateValues = stateValues == null ? Collections.<String, String> emptyMap() : new HashMap<>(stateValues); + this.version = version; + } + + @Override + public long getVersion() { + return version; + } + + @Override + public String get(final String key) { + return stateValues.get(key); + } + + @Override + public Map<String, String> toMap() { + return Collections.unmodifiableMap(stateValues); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java index bff1d62..6a6e4cf 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java @@ -16,23 +16,31 @@ */ package org.apache.nifi.util; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.state.MockStateManager; public class MockControllerServiceInitializationContext extends MockControllerServiceLookup implements ControllerServiceInitializationContext, ControllerServiceLookup { private final String identifier; private final ComponentLog logger; + private final StateManager stateManager; public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) { - this(controllerService, identifier, new MockProcessorLog(identifier, controllerService)); + this(controllerService, identifier, new MockStateManager()); } - public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger) { + public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final StateManager stateManager) { + this(controllerService, identifier, new MockProcessorLog(identifier, controllerService), stateManager); + } + + public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger, final StateManager stateManager) { this.identifier = identifier; this.logger = logger; + this.stateManager = stateManager; addControllerService(controllerService, identifier); } @@ -55,4 +63,9 @@ public class MockControllerServiceInitializationContext extends MockControllerSe public ComponentLog getLogger() { return logger; } + + @Override + public StateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index e8e4dd5..c641d24 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -34,17 +34,20 @@ import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SchedulingContext; +import org.apache.nifi.state.MockStateManager; import org.junit.Assert; public class MockProcessContext extends MockControllerServiceLookup implements SchedulingContext, ControllerServiceLookup { private final ConfigurableComponent component; private final Map<PropertyDescriptor, String> properties = new HashMap<>(); + private final StateManager stateManager; private String annotationData = null; private boolean yieldCalled = false; @@ -56,17 +59,22 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private volatile Set<Relationship> connections = new HashSet<>(); private volatile Set<Relationship> unavailableRelationships = new HashSet<>(); + public MockProcessContext(final ConfigurableComponent component) { + this(component, new MockStateManager()); + } + /** * Creates a new MockProcessContext for the given Processor * * @param component being mocked */ - public MockProcessContext(final ConfigurableComponent component) { + public MockProcessContext(final ConfigurableComponent component, final StateManager stateManager) { this.component = Objects.requireNonNull(component); + this.stateManager = stateManager; } - public MockProcessContext(final ControllerService component, final MockProcessContext context) { - this(component); + public MockProcessContext(final ControllerService component, final MockProcessContext context, final StateManager stateManager) { + this(component, stateManager); try { annotationData = context.getControllerServiceAnnotationData(component); @@ -121,7 +129,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S requireNonNull(value, "Cannot set property to null value; if the intent is to remove the property, call removeProperty instead"); final PropertyDescriptor fullyPopulatedDescriptor = component.getPropertyDescriptor(descriptor.getName()); - final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this)); + final ValidationResult result = fullyPopulatedDescriptor.validate(value, new MockValidationContext(this, stateManager)); String oldValue = properties.put(fullyPopulatedDescriptor, value); if (oldValue == null) { oldValue = fullyPopulatedDescriptor.getDefaultValue(); @@ -204,7 +212,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S * non-null */ public Collection<ValidationResult> validate() { - return component.validate(new MockValidationContext(this)); + return component.validate(new MockValidationContext(this, stateManager)); } public boolean isValid() { @@ -342,4 +350,9 @@ public class MockProcessContext extends MockControllerServiceLookup implements S final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue()); return (elRanges != null && !elRanges.isEmpty()); } + + @Override + public StateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java index 63a9876..33719ec 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.reporting.Bulletin; @@ -37,11 +38,13 @@ public class MockReportingContext extends MockControllerServiceLookup implements private final Map<String, ControllerServiceConfiguration> controllerServices; private final MockEventAccess eventAccess = new MockEventAccess(); private final Map<PropertyDescriptor, String> properties = new HashMap<>(); + private final StateManager stateManager; private final Map<String, List<Bulletin>> componentBulletinsCreated = new HashMap<>(); - public MockReportingContext(final Map<String, ControllerService> controllerServices) { + public MockReportingContext(final Map<String, ControllerService> controllerServices, final StateManager stateManager) { this.controllerServices = new HashMap<>(); + this.stateManager = stateManager; for (final Map.Entry<String, ControllerService> entry : controllerServices.entrySet()) { this.controllerServices.put(entry.getKey(), new ControllerServiceConfiguration(entry.getValue())); } @@ -112,4 +115,9 @@ public class MockReportingContext extends MockControllerServiceLookup implements return new ArrayList<>(created); } + + @Override + public StateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java index d73a09b..6442778 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -27,6 +27,7 @@ import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageC import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.expression.ExpressionLanguageCompiler; @@ -35,9 +36,11 @@ public class MockValidationContext implements ValidationContext, ControllerServi private final MockProcessContext context; private final Map<String, Boolean> expressionLanguageSupported; + private final StateManager stateManager; - public MockValidationContext(final MockProcessContext processContext) { + public MockValidationContext(final MockProcessContext processContext, final StateManager stateManager) { this.context = processContext; + this.stateManager = stateManager; final Map<PropertyDescriptor, String> properties = processContext.getProperties(); expressionLanguageSupported = new HashMap<>(properties.size()); @@ -63,8 +66,8 @@ public class MockValidationContext implements ValidationContext, ControllerServi @Override public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) { - final MockProcessContext serviceProcessContext = new MockProcessContext(controllerService, context); - return new MockValidationContext(serviceProcessContext); + final MockProcessContext serviceProcessContext = new MockProcessContext(controllerService, context, stateManager); + return new MockValidationContext(serviceProcessContext, stateManager); } @Override @@ -118,6 +121,7 @@ public class MockValidationContext implements ValidationContext, ControllerServi return context.isControllerServiceEnabling(serviceIdentifier); } + @Override public boolean isExpressionLanguagePresent(final String value) { if (value == null) { return false; http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 925f0d8..8220632 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -69,6 +69,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.state.MockStateManager; import org.junit.Assert; public class StandardProcessorTestRunner implements TestRunner { @@ -80,6 +81,7 @@ public class StandardProcessorTestRunner implements TestRunner { private final SharedSessionState sharedState; private final AtomicLong idGenerator; private final boolean triggerSerially; + private final MockStateManager stateManager; private int numThreads = 1; private final AtomicInteger invocations = new AtomicInteger(0); @@ -100,7 +102,8 @@ public class StandardProcessorTestRunner implements TestRunner { this.sharedState = new SharedSessionState(processor, idGenerator); this.flowFileQueue = sharedState.getFlowFileQueue(); this.sessionFactory = new MockSessionFactory(sharedState, processor); - this.context = new MockProcessContext(processor); + this.stateManager = new MockStateManager(); + this.context = new MockProcessContext(processor, stateManager); detectDeprecatedAnnotations(processor); @@ -575,7 +578,7 @@ public class StandardProcessorTestRunner implements TestRunner { // } final ComponentLog logger = new MockProcessorLog(identifier, service); - final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger); + final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger, stateManager); initContext.addControllerServices(context); service.initialize(initContext); @@ -595,7 +598,7 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void assertNotValid(final ControllerService service) { - final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final ValidationContext validationContext = new MockValidationContext(context, stateManager).getControllerServiceValidationContext(service); final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); for (final ValidationResult result : results) { @@ -609,7 +612,7 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void assertValid(final ControllerService service) { - final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final ValidationContext validationContext = new MockValidationContext(context, stateManager).getControllerServiceValidationContext(service); final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); for (final ValidationResult result : results) { @@ -719,7 +722,7 @@ public class StandardProcessorTestRunner implements TestRunner { final Map<PropertyDescriptor, String> curProps = configuration.getProperties(); final Map<PropertyDescriptor, String> updatedProps = new HashMap<>(curProps); - final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final ValidationContext validationContext = new MockValidationContext(context, stateManager).getControllerServiceValidationContext(service); final ValidationResult validationResult = property.validate(value, validationContext); updatedProps.put(property, value); @@ -768,4 +771,8 @@ public class StandardProcessorTestRunner implements TestRunner { sharedState.clearProvenanceEvents(); } + @Override + public MockStateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 6c8f192..378a92e 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -35,6 +35,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.state.MockStateManager; public interface TestRunner { @@ -824,4 +825,9 @@ public interface TestRunner { * Clears the Provenance Events that have been emitted by the Processor */ void clearProvenanceEvents(); + + /** + * @return the State Provider that is used to stored and retrieve local state + */ + MockStateManager getStateManager(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java index fbdea94..d48af63 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java @@ -123,4 +123,4 @@ public class TestMockProcessContext { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java index 56216aa..c1cca26 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.documentation.mock; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.logging.ComponentLog; @@ -43,4 +44,9 @@ public class MockControllerServiceInitializationContext implements ControllerSer return new MockProcessorLogger(); } -} + @Override + public StateManager getStateManager() { + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java index edf0475..1acdd49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -102,4 +103,9 @@ public class MockProcessContext implements ProcessContext { public boolean isExpressionLanguagePresent(PropertyDescriptor property) { return false; } -} + + @Override + public StateManager getStateManager() { + return null; + } +} \ No newline at end of file
