Repository: nifi
Updated Branches:
  refs/heads/master 498b5023c -> 7314af617


NIFI-259: Added Stateful annotation as described on ticket


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/774c29a4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/774c29a4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/774c29a4

Branch: refs/heads/master
Commit: 774c29a4da8fdb73d4ad92c2b25ecb54b4bcde3c
Parents: bbd35a0
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Jan 12 15:28:35 2016 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Tue Jan 12 15:28:35 2016 -0500

----------------------------------------------------------------------
 .../nifi/annotation/behavior/Stateful.java      | 58 +++++++++++++++++++
 .../org/apache/nifi/components/state/Scope.java |  5 ++
 .../nifi/components/state/StateManager.java     | 19 ++++++-
 .../src/main/asciidoc/administration-guide.adoc |  5 ++
 .../org/apache/nifi/state/MockStateManager.java | 59 ++++++++++++++++----
 ...kControllerServiceInitializationContext.java |  2 +-
 .../apache/nifi/util/MockProcessContext.java    |  2 +-
 .../nifi/util/StandardProcessorTestRunner.java  | 46 ++++++++++++---
 .../java/org/apache/nifi/util/TestRunner.java   |  8 ++-
 .../org/apache/nifi/logging/LogRepository.java  | 12 ++++
 .../apache/nifi/controller/FlowController.java  |  6 ++
 .../controller/state/StandardStateManager.java  | 23 +++++++-
 .../nifi/controller/state/StandardStateMap.java |  4 ++
 .../repository/StandardLogRepository.java       | 13 +++++
 .../apache/nifi/processors/hadoop/ListHDFS.java |  5 ++
 .../standard/AbstractListProcessor.java         |  3 +
 .../nifi/processors/standard/GetHTTP.java       |  5 +-
 .../nifi/processors/standard/ListFile.java      |  5 ++
 .../nifi/processors/standard/ListSFTP.java      |  5 ++
 .../nifi/processors/standard/TailFile.java      |  3 +
 .../standard/TestDetectDuplicate.java           |  2 +-
 .../standard/TestRouteOnAttribute.java          |  4 +-
 22 files changed, 265 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java 
b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java
new file mode 100644
index 0000000..2303e64
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.annotation.behavior;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+
+/**
+ * <p>
+ * Annotation that a Processor, ReportingTask, or Controller Service can use 
to indicate
+ * that the component makes use of the {@link StateManager}. This annotation 
provides the
+ * user with a description of what information is being stored so that the 
user is able to
+ * understand what is shown to them and know what they are clearing should 
they choose to
+ * clear the state. Additionally, the UI will not show any state information 
to users if
+ * this annotation is not present.
+ * </p>
+ */
+@Documented
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface Stateful {
+    /**
+     * Provides a description of what information is being stored in the 
{@link StateManager}
+     *
+     * @return a description of what information is being stored in the {@link 
StateManager}
+     */
+    String description();
+
+    /**
+     * Indicates the Scope(s) associated with the State that is stored and 
retrieved.
+     *
+     * @return
+     */
+    Scope[]scopes();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
index 8daf12a..dd0d0aa 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java
@@ -33,4 +33,9 @@ public enum Scope {
      * node in the cluster.
      */
     LOCAL;
+
+    @Override
+    public String toString() {
+        return name();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
index f52b2e3..0c0e867 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
@@ -20,6 +20,8 @@ package org.apache.nifi.components.state;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.nifi.annotation.behavior.Stateful;
+
 /**
  * <p>
  * The StateManager is responsible for providing NiFi components a mechanism 
for storing
@@ -30,12 +32,25 @@ import java.util.Map;
  * When calling methods in this class, the {@link Scope} is used in order to 
specify whether
  * state should be stored/retrieved from the local state or the clustered 
state. However, if
  * any instance of NiFi is not clustered (or is disconnected from its 
cluster), the Scope is
- * not really relevant and the local state will be used in all cases. This 
allows component
+ * not really relevant and the local state will be used. This allows component
  * developers to not concern themselves with whether or not a particular 
instance of NiFi is
  * clustered. Instead, developers should assume that the instance is indeed 
clustered and write
- * the component accordingly. If not clustered, the component will still 
behavior in the same
+ * the component accordingly. If not clustered, the component will still 
behave in the same
  * manner, as a standalone node could be thought of as a "cluster of 1."
  * </p>
+ *
+ * <p>
+ * This mechanism is designed to allow developers to easily store and retrieve 
small amounts of state.
+ * The storage mechanism is implementation-specific, but is typically either 
stored on a remote system
+ * or on disk. For that reason, one should consider the cost of storing and 
retrieving this data, and the
+ * amount of data should be kept to the minimum required.
+ * </p>
+ *
+ * <p>
+ * Any component that wishes to use the StateManager should also use the 
{@link Stateful} annotation to provide
+ * a description of what state is being stored and what Scope is used. If this 
annotation is not present, the UI
+ * will not expose such information or allow DFMs to clear the state.
+ * </p>
  */
 public interface StateManager {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 920b99f..b24ec66 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -418,6 +418,11 @@ file, rather than being configured via the 
_nifi.properties_ file, simply becaus
 and it is easier to maintain and understand the configuration in an XML-based 
file such as this, than to mix the properties of the Provider
 in with all of the other NiFi framework-specific properties.
 
+It should be noted that if Processors and other components save state using 
the Clustered scope, the Local State Provider will be used
+if the instance is a standalone instance (not in a cluster) or is disconnected 
from the cluster. This also means that if a standalone instance
+is migrated to become a cluster, then that state will no longer be available, 
as the component will begin using the Clustered State Provider
+instead of the Local State Provider.
+
 
 [[embedded_zookeeper]]
 === Embedded ZooKeeper Server

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java 
b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
index cf9d50c..81ad988 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
@@ -39,22 +40,35 @@ public class MockStateManager implements StateManager {
     private volatile boolean failToGetClusterState = false;
     private volatile boolean failToSetClusterState = false;
 
-    private void verifyCanSet(final Scope scope) throws IOException {
-        final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState 
: failToSetClusterState;
-        if (failToSet) {
-            throw new IOException("Unit Test configured to throw IOException 
if " + scope + " State is set");
-        }
-    }
+    private final boolean usesLocalState;
+    private final boolean usesClusterState;
 
-    private void verifyCanGet(final Scope scope) throws IOException {
-        final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState 
: failToGetClusterState;
-        if (failToGet) {
-            throw new IOException("Unit Test configured to throw IOException 
if " + scope + " State is retrieved");
+    public MockStateManager(final Object component) {
+        final Stateful stateful = 
component.getClass().getAnnotation(Stateful.class);
+        if (stateful == null) {
+            usesLocalState = false;
+            usesClusterState = false;
+        } else {
+            final Scope[] scopes = stateful.scopes();
+            boolean local = false;
+            boolean cluster = false;
+
+            for (final Scope scope : scopes) {
+                if (scope == Scope.LOCAL) {
+                    local = true;
+                } else if (scope == Scope.CLUSTER) {
+                    cluster = true;
+                }
+            }
+
+            usesLocalState = local;
+            usesClusterState = cluster;
         }
     }
 
     @Override
     public synchronized void setState(final Map<String, String> state, final 
Scope scope) throws IOException {
+        verifyAnnotation(scope);
         verifyCanSet(scope);
         final StateMap stateMap = new MockStateMap(state, 
versionIndex.incrementAndGet());
 
@@ -67,11 +81,13 @@ public class MockStateManager implements StateManager {
 
     @Override
     public synchronized StateMap getState(final Scope scope) throws 
IOException {
+        verifyAnnotation(scope);
         verifyCanGet(scope);
         return retrieveState(scope);
     }
 
     private synchronized StateMap retrieveState(final Scope scope) {
+        verifyAnnotation(scope);
         if (scope == Scope.CLUSTER) {
             return clusterStateMap;
         } else {
@@ -81,6 +97,7 @@ public class MockStateManager implements StateManager {
 
     @Override
     public synchronized boolean replace(final StateMap oldValue, final 
Map<String, String> newValue, final Scope scope) throws IOException {
+        verifyAnnotation(scope);
         if (scope == Scope.CLUSTER) {
             if (oldValue == clusterStateMap) {
                 verifyCanSet(scope);
@@ -102,9 +119,31 @@ public class MockStateManager implements StateManager {
 
     @Override
     public synchronized void clear(final Scope scope) throws IOException {
+        verifyAnnotation(scope);
         setState(Collections.<String, String> emptyMap(), scope);
     }
 
+    private void verifyCanSet(final Scope scope) throws IOException {
+        final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState 
: failToSetClusterState;
+        if (failToSet) {
+            throw new IOException("Unit Test configured to throw IOException 
if " + scope + " State is set");
+        }
+    }
+
+    private void verifyCanGet(final Scope scope) throws IOException {
+        final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState 
: failToGetClusterState;
+        if (failToGet) {
+            throw new IOException("Unit Test configured to throw IOException 
if " + scope + " State is retrieved");
+        }
+    }
+
+    private void verifyAnnotation(final Scope scope) {
+        // ensure that the @Stateful annotation is present with the 
appropriate Scope
+        if ((scope == Scope.LOCAL && !usesLocalState) || (scope == 
Scope.CLUSTER && !usesClusterState)) {
+            Assert.fail("Component is attempting to set or retrieve state with 
a scope of " + scope + " but does not declare that it will use "
+                + scope + " state. A @Stateful annotation should be added to 
the component with a scope of " + scope);
+        }
+    }
 
     private String getValue(final String key, final Scope scope) {
         final StateMap stateMap;

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
index 6a6e4cf..754bec0 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java
@@ -30,7 +30,7 @@ public class MockControllerServiceInitializationContext 
extends MockControllerSe
     private final StateManager stateManager;
 
     public MockControllerServiceInitializationContext(final ControllerService 
controllerService, final String identifier) {
-        this(controllerService, identifier, new MockStateManager());
+        this(controllerService, identifier, new 
MockStateManager(controllerService));
     }
 
     public MockControllerServiceInitializationContext(final ControllerService 
controllerService, final String identifier, final StateManager stateManager) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index c641d24..02a1d8a 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -60,7 +60,7 @@ public class MockProcessContext extends 
MockControllerServiceLookup implements S
     private volatile Set<Relationship> unavailableRelationships = new 
HashSet<>();
 
     public MockProcessContext(final ConfigurableComponent component) {
-        this(component, new MockStateManager());
+        this(component, new MockStateManager(component));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 8220632..ddc1e71 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -57,6 +57,7 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.queue.QueueSize;
@@ -81,7 +82,8 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     private final SharedSessionState sharedState;
     private final AtomicLong idGenerator;
     private final boolean triggerSerially;
-    private final MockStateManager stateManager;
+    private final MockStateManager processorStateManager;
+    private final Map<String, MockStateManager> controllerServiceStateManagers 
= new HashMap<>();
 
     private int numThreads = 1;
     private final AtomicInteger invocations = new AtomicInteger(0);
@@ -102,8 +104,8 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         this.sharedState = new SharedSessionState(processor, idGenerator);
         this.flowFileQueue = sharedState.getFlowFileQueue();
         this.sessionFactory = new MockSessionFactory(sharedState, processor);
-        this.stateManager = new MockStateManager();
-        this.context = new MockProcessContext(processor, stateManager);
+        this.processorStateManager = new MockStateManager(processor);
+        this.context = new MockProcessContext(processor, 
processorStateManager);
 
         detectDeprecatedAnnotations(processor);
 
@@ -578,7 +580,9 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         // }
 
         final ComponentLog logger = new MockProcessorLog(identifier, service);
-        final MockControllerServiceInitializationContext initContext = new 
MockControllerServiceInitializationContext(requireNonNull(service), 
requireNonNull(identifier), logger, stateManager);
+        final MockStateManager serviceStateManager = new 
MockStateManager(service);
+        controllerServiceStateManagers.put(identifier, serviceStateManager);
+        final MockControllerServiceInitializationContext initContext = new 
MockControllerServiceInitializationContext(requireNonNull(service), 
requireNonNull(identifier), logger, serviceStateManager);
         initContext.addControllerServices(context);
         service.initialize(initContext);
 
@@ -598,7 +602,12 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void assertNotValid(final ControllerService service) {
-        final ValidationContext validationContext = new 
MockValidationContext(context, 
stateManager).getControllerServiceValidationContext(service);
+        final StateManager serviceStateManager = 
controllerServiceStateManagers.get(service.getIdentifier());
+        if (serviceStateManager == null) {
+            throw new IllegalStateException("Controller Service has not been 
added to this TestRunner via the #addControllerService method");
+        }
+
+        final ValidationContext validationContext = new 
MockValidationContext(context, 
serviceStateManager).getControllerServiceValidationContext(service);
         final Collection<ValidationResult> results = 
context.getControllerService(service.getIdentifier()).validate(validationContext);
 
         for (final ValidationResult result : results) {
@@ -612,7 +621,12 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public void assertValid(final ControllerService service) {
-        final ValidationContext validationContext = new 
MockValidationContext(context, 
stateManager).getControllerServiceValidationContext(service);
+        final StateManager serviceStateManager = 
controllerServiceStateManagers.get(service.getIdentifier());
+        if (serviceStateManager == null) {
+            throw new IllegalStateException("Controller Service has not been 
added to this TestRunner via the #addControllerService method");
+        }
+
+        final ValidationContext validationContext = new 
MockValidationContext(context, 
serviceStateManager).getControllerServiceValidationContext(service);
         final Collection<ValidationResult> results = 
context.getControllerService(service.getIdentifier()).validate(validationContext);
 
         for (final ValidationResult result : results) {
@@ -718,11 +732,16 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public ValidationResult setProperty(final ControllerService service, final 
PropertyDescriptor property, final String value) {
+        final MockStateManager serviceStateManager = 
controllerServiceStateManagers.get(service.getIdentifier());
+        if (serviceStateManager == null) {
+            throw new IllegalStateException("Controller service " + service + 
" has not been added to this TestRunner via the #addControllerService method");
+        }
+
         final ControllerServiceConfiguration configuration = 
getConfigToUpdate(service);
         final Map<PropertyDescriptor, String> curProps = 
configuration.getProperties();
         final Map<PropertyDescriptor, String> updatedProps = new 
HashMap<>(curProps);
 
-        final ValidationContext validationContext = new 
MockValidationContext(context, 
stateManager).getControllerServiceValidationContext(service);
+        final ValidationContext validationContext = new 
MockValidationContext(context, 
serviceStateManager).getControllerServiceValidationContext(service);
         final ValidationResult validationResult = property.validate(value, 
validationContext);
 
         updatedProps.put(property, value);
@@ -773,6 +792,17 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public MockStateManager getStateManager() {
-        return stateManager;
+        return processorStateManager;
+    }
+
+    /**
+     * Returns the State Manager for the given Controller Service.
+     *
+     * @param controllerService
+     * @return
+     */
+    @Override
+    public MockStateManager getStateManager(final ControllerService 
controllerService) {
+        return 
controllerServiceStateManagers.get(controllerService.getIdentifier());
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 378a92e..9e7082c 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -827,7 +827,13 @@ public interface TestRunner {
     void clearProvenanceEvents();
 
     /**
-     * @return the State Provider that is used to stored and retrieve local 
state
+     * @return the State Manager that is used to stored and retrieve state
      */
     MockStateManager getStateManager();
+
+    /**
+     * @param service the controller service of interest
+     * @return the State Manager that is used to store and retrieve state for 
the given controller service
+     */
+    MockStateManager getStateManager(ControllerService service);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
index 92091da..6e1311a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java
@@ -64,4 +64,16 @@ public interface LogRepository {
      * Removes all LogObservers from this Repository
      */
     void removeAllObservers();
+
+    /**
+     * Sets the current logger for the component
+     * 
+     * @param logger the logger to use
+     */
+    void setLogger(ComponentLog logger);
+
+    /**
+     * @return the current logger for the component
+     */
+    ComponentLog getLogger();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 638aa08..8f3af55 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -936,6 +936,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             final ProcessorLog processorLogger = new 
SimpleProcessLogger(identifier, processor);
             final ProcessorInitializationContext ctx = new 
StandardProcessorInitializationContext(identifier, processorLogger, this);
             processor.initialize(ctx);
+
+            
LogRepositoryFactory.getRepository(identifier).setLogger(processorLogger);
             return processor;
         } catch (final Throwable t) {
             throw new ProcessorInstantiationException(type, t);
@@ -1136,6 +1138,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
             clusterTaskExecutor.shutdown();
 
+            if (zooKeeperStateServer != null) {
+                zooKeeperStateServer.shutdown();
+            }
+
             // Trigger any processors' methods marked with @OnShutdown to be 
called
             rootGroup.shutdown();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
index e83bbac..639f8a2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java
@@ -24,6 +24,10 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
+import org.apache.nifi.processor.SimpleProcessLogger;
 
 public class StandardStateManager implements StateManager {
     private final StateProvider localProvider;
@@ -44,25 +48,40 @@ public class StandardStateManager implements StateManager {
         return clusterProvider;
     }
 
+    private ComponentLog getLogger(final String componentId) {
+        final LogRepository repo = 
LogRepositoryFactory.getRepository(componentId);
+        final ComponentLog logger = (repo == null) ? null : repo.getLogger();
+        if (repo == null || logger == null) {
+            return new SimpleProcessLogger(componentId, this);
+        }
+
+        return logger;
+    }
 
     @Override
     public StateMap getState(final Scope scope) throws IOException {
-        return getProvider(scope).getState(componentId);
+        final StateMap stateMap = getProvider(scope).getState(componentId);
+        getLogger(componentId).debug("Returning {} State: {}", new Object[] 
{scope, stateMap});
+        return stateMap;
     }
 
 
     @Override
     public boolean replace(final StateMap oldValue, final Map<String, String> 
newValue, final Scope scope) throws IOException {
-        return getProvider(scope).replace(oldValue, newValue, componentId);
+        final boolean replaced = getProvider(scope).replace(oldValue, 
newValue, componentId);
+        getLogger(componentId).debug("{} State from old value {} to new value 
{} was {}", new Object[] {scope, oldValue, newValue, replaced});
+        return replaced;
     }
 
     @Override
     public void setState(final Map<String, String> state, final Scope scope) 
throws IOException {
+        getLogger(componentId).debug("Setting {} State to {}", new Object[] 
{scope, state});
         getProvider(scope).setState(state, componentId);
     }
 
     @Override
     public void clear(final Scope scope) throws IOException {
+        getLogger(componentId).debug("Clearing {} State", new Object[] 
{scope});
         getProvider(scope).clear(componentId);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
index b006ac6..672fd6f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java
@@ -46,4 +46,8 @@ public class StandardStateMap implements StateMap {
         return stateValues;
     }
 
+    @Override
+    public String toString() {
+        return "StandardStateMap[version=" + version + ", values=" + 
stateValues + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
index f8d37e5..f6c55b6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogMessage;
 import org.apache.nifi.logging.LogObserver;
@@ -45,6 +46,8 @@ public class StandardLogRepository implements LogRepository {
 
     private final Logger logger = 
LoggerFactory.getLogger(StandardLogRepository.class);
 
+    private volatile ComponentLog componentLogger;
+
     @Override
     public void addLogMessage(final LogLevel level, final String message) {
         addLogMessage(level, message, (Throwable) null);
@@ -170,4 +173,14 @@ public class StandardLogRepository implements 
LogRepository {
             writeLock.unlock();
         }
     }
+
+    @Override
+    public void setLogger(final ComponentLog componentLogger) {
+        this.componentLogger = componentLogger;
+    }
+
+    @Override
+    public ComponentLog getLogger() {
+        return componentLogger;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 44e4a03..3645eec 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -85,6 +86,10 @@ import org.codehaus.jackson.map.ObjectMapper;
     @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
             + "3 for the group, and 3 for other users. For example rw-rw-r--")
 })
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of 
HDFS files, the timestamp of the newest file is stored, "
+    + "along with the filenames of all files that share that same timestamp. 
This allows the Processor to list only files that have been added or modified 
after "
+    + "this date the next time that the Processor is run. State is stored 
across the cluster so that this Processor can be run on Primary Node only and 
if a new Primary "
+    + "Node is selected, the new node can pick up where the previous node left 
off, without duplicating the data.")
 @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
 public class ListHDFS extends AbstractHadoopProcessor {
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index e27e5fe..c9bf369 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
@@ -140,6 +141,8 @@ import org.codehaus.jackson.map.ObjectMapper;
  * </ul>
  */
 @TriggerSerially
+@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of 
resources is performed, the latest timestamp of any of the resources is stored 
in the component's state, "
+    + "along with all resources that have that same timestmap so that the 
Processor can avoid data duplication. The scope used depends on the 
implementation.")
 public abstract class AbstractListProcessor<T extends ListableEntity> extends 
AbstractProcessor {
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
         .name("Distributed Cache Service")

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index d3eb515..48f22c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -65,6 +65,7 @@ import 
org.apache.http.impl.conn.BasicHttpClientConnectionManager;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -93,11 +94,13 @@ import org.apache.nifi.util.StopWatch;
 
 @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("Fetches a file via HTTP")
+@CapabilityDescription("Fetches a file via HTTP. If the HTTP server supports 
it, the Processor then stores the Last Modified time and the ETag "
+    + "so that data will not be pulled again until the remote data changes or 
until the state is cleared.")
 @WritesAttributes({
     @WritesAttribute(attribute = "filename", description = "The filename is 
set to the name of the file on the remote server"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type of 
the FlowFile, as reported by the HTTP Content-Type header")
 })
+@Stateful(scopes = {Scope.LOCAL}, description = "Stores Last Modified Time and 
ETag headers returned by server so that the same data will not be fetched 
multiple times.")
 public class GetHTTP extends AbstractSessionFactoryProcessor {
 
     static final int PERSISTENCE_INTERVAL_MSEC = 10000;

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index ed661ae..e82be2c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -94,6 +95,10 @@ import org.apache.nifi.processors.standard.util.FileInfo;
                 "rw-rw-r--")
 })
 @SeeAlso({GetFile.class, PutFile.class, FetchFile.class})
+@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After 
performing a listing of files, the timestamp of the newest file is stored, "
+    + "along with the filenames of all files that share that same timestamp. 
This allows the Processor to list only files that have been added or modified 
after "
+    + "this date the next time that the Processor is run. Whether the state is 
stored with a Local or Cluster scope depends on the value of the "
+    + "<Input Directory Location> property.")
 public class ListFile extends AbstractListProcessor<FileInfo> {
     static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local", "Input Directory is located on a local disk. State will be stored 
locally on each node in the cluster.");
     static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", 
"Remote", "Input Directory is located on a remote system. State will be stored 
across the cluster so that "

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index e1cd417..d92f398 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -50,6 +51,10 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
     @WritesAttribute(attribute = "filename", description = "The name of the 
file on the SFTP Server"),
     @WritesAttribute(attribute = "path", description = "The fully qualified 
name of the directory on the SFTP Server from which the file was pulled"),
 })
+@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing 
of files, the timestamp of the newest file is stored, "
+    + "along with the filename all files that share that same timestamp. This 
allows the Processor to list only files that have been added or modified after "
+    + "this date the next time that the Processor is run. State is stored 
across the cluster so that this Processor can be run on Primary Node only and 
if "
+    + "a new Primary Node is selected, the new node will not duplicate the 
data that was listed by the previous Primary Node.")
 public class ListSFTP extends ListFileTransfer {
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 7343eea..6f99b42 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -46,6 +46,7 @@ import java.util.zip.Checksum;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -79,6 +80,8 @@ import org.apache.nifi.util.LongHolder;
     + "was not running (provided that the data still exists upon restart of 
NiFi). It is generally advisable to set the Run Schedule to a few seconds, 
rather than running "
     + "with the default value of 0 secs, as this Processor will consume a lot 
of resources if scheduled very aggressively. At this time, this Processor does 
not support "
     + "ingesting files that have been compressed when 'rolled over'.")
+@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state 
about where in the Tailed File it left off so that on restart it does not have 
to duplicate data. "
+    + "State is stored either local or clustered depend on the <File Location> 
property.")
 public class TailFile extends AbstractProcessor {
 
     static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local", "File is located on a local disk drive. Each node in a cluster will 
tail a different file.");

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index e09d6f0..8bec03e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -106,7 +106,7 @@ public class TestDetectDuplicate {
 
         final DistributedMapCacheClientImpl client = new 
DistributedMapCacheClientImpl();
         final ComponentLog logger = new MockProcessorLog("client", client);
-        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger, new 
MockStateManager());
+        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger, new 
MockStateManager(client));
         client.initialize(clientInitContext);
 
         return client;

http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
index 3b70dc4..66dc854 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
@@ -39,7 +39,7 @@ public class TestRouteOnAttribute {
     @Test
     public void testInvalidOnMisconfiguredProperty() {
         final RouteOnAttribute proc = new RouteOnAttribute();
-        final MockProcessContext ctx = new MockProcessContext(proc, new 
MockStateManager());
+        final MockProcessContext ctx = new MockProcessContext(proc, new 
MockStateManager(proc));
         final ValidationResult validationResult = ctx.setProperty("RouteA", 
"${a:equals('b')"); // Missing closing brace
         assertFalse(validationResult.isValid());
     }
@@ -47,7 +47,7 @@ public class TestRouteOnAttribute {
     @Test
     public void testInvalidOnNonBooleanProperty() {
         final RouteOnAttribute proc = new RouteOnAttribute();
-        final MockProcessContext ctx = new MockProcessContext(proc, new 
MockStateManager());
+        final MockProcessContext ctx = new MockProcessContext(proc, new 
MockStateManager(proc));
         final ValidationResult validationResult = ctx.setProperty("RouteA", 
"${a:length()"); // Should be boolean
         assertFalse(validationResult.isValid());
     }

Reply via email to