Repository: nifi Updated Branches: refs/heads/master 498b5023c -> 7314af617
NIFI-259: Added Stateful annotation as described on ticket Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/774c29a4 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/774c29a4 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/774c29a4 Branch: refs/heads/master Commit: 774c29a4da8fdb73d4ad92c2b25ecb54b4bcde3c Parents: bbd35a0 Author: Mark Payne <marka...@hotmail.com> Authored: Tue Jan 12 15:28:35 2016 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Tue Jan 12 15:28:35 2016 -0500 ---------------------------------------------------------------------- .../nifi/annotation/behavior/Stateful.java | 58 +++++++++++++++++++ .../org/apache/nifi/components/state/Scope.java | 5 ++ .../nifi/components/state/StateManager.java | 19 ++++++- .../src/main/asciidoc/administration-guide.adoc | 5 ++ .../org/apache/nifi/state/MockStateManager.java | 59 ++++++++++++++++---- ...kControllerServiceInitializationContext.java | 2 +- .../apache/nifi/util/MockProcessContext.java | 2 +- .../nifi/util/StandardProcessorTestRunner.java | 46 ++++++++++++--- .../java/org/apache/nifi/util/TestRunner.java | 8 ++- .../org/apache/nifi/logging/LogRepository.java | 12 ++++ .../apache/nifi/controller/FlowController.java | 6 ++ .../controller/state/StandardStateManager.java | 23 +++++++- .../nifi/controller/state/StandardStateMap.java | 4 ++ .../repository/StandardLogRepository.java | 13 +++++ .../apache/nifi/processors/hadoop/ListHDFS.java | 5 ++ .../standard/AbstractListProcessor.java | 3 + .../nifi/processors/standard/GetHTTP.java | 5 +- .../nifi/processors/standard/ListFile.java | 5 ++ .../nifi/processors/standard/ListSFTP.java | 5 ++ .../nifi/processors/standard/TailFile.java | 3 + .../standard/TestDetectDuplicate.java | 2 +- .../standard/TestRouteOnAttribute.java | 4 +- 22 files changed, 265 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java new file mode 100644 index 0000000..2303e64 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/Stateful.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; + +/** + * <p> + * Annotation that a Processor, ReportingTask, or Controller Service can use to indicate + * that the component makes use of the {@link StateManager}. This annotation provides the + * user with a description of what information is being stored so that the user is able to + * understand what is shown to them and know what they are clearing should they choose to + * clear the state. Additionally, the UI will not show any state information to users if + * this annotation is not present. + * </p> + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface Stateful { + /** + * Provides a description of what information is being stored in the {@link StateManager} + * + * @return a description of what information is being stored in the {@link StateManager} + */ + String description(); + + /** + * Indicates the Scope(s) associated with the State that is stored and retrieved. + * + * @return + */ + Scope[]scopes(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java index 8daf12a..dd0d0aa 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/Scope.java @@ -33,4 +33,9 @@ public enum Scope { * node in the cluster. */ LOCAL; + + @Override + public String toString() { + return name(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java index f52b2e3..0c0e867 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java @@ -20,6 +20,8 @@ package org.apache.nifi.components.state; import java.io.IOException; import java.util.Map; +import org.apache.nifi.annotation.behavior.Stateful; + /** * <p> * The StateManager is responsible for providing NiFi components a mechanism for storing @@ -30,12 +32,25 @@ import java.util.Map; * When calling methods in this class, the {@link Scope} is used in order to specify whether * state should be stored/retrieved from the local state or the clustered state. However, if * any instance of NiFi is not clustered (or is disconnected from its cluster), the Scope is - * not really relevant and the local state will be used in all cases. This allows component + * not really relevant and the local state will be used. This allows component * developers to not concern themselves with whether or not a particular instance of NiFi is * clustered. Instead, developers should assume that the instance is indeed clustered and write - * the component accordingly. If not clustered, the component will still behavior in the same + * the component accordingly. If not clustered, the component will still behave in the same * manner, as a standalone node could be thought of as a "cluster of 1." * </p> + * + * <p> + * This mechanism is designed to allow developers to easily store and retrieve small amounts of state. + * The storage mechanism is implementation-specific, but is typically either stored on a remote system + * or on disk. For that reason, one should consider the cost of storing and retrieving this data, and the + * amount of data should be kept to the minimum required. + * </p> + * + * <p> + * Any component that wishes to use the StateManager should also use the {@link Stateful} annotation to provide + * a description of what state is being stored and what Scope is used. If this annotation is not present, the UI + * will not expose such information or allow DFMs to clear the state. + * </p> */ public interface StateManager { http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 920b99f..b24ec66 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -418,6 +418,11 @@ file, rather than being configured via the _nifi.properties_ file, simply becaus and it is easier to maintain and understand the configuration in an XML-based file such as this, than to mix the properties of the Provider in with all of the other NiFi framework-specific properties. +It should be noted that if Processors and other components save state using the Clustered scope, the Local State Provider will be used +if the instance is a standalone instance (not in a cluster) or is disconnected from the cluster. This also means that if a standalone instance +is migrated to become a cluster, then that state will no longer be available, as the component will begin using the Clustered State Provider +instead of the Local State Provider. + [[embedded_zookeeper]] === Embedded ZooKeeper Server http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java index cf9d50c..81ad988 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java +++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; @@ -39,22 +40,35 @@ public class MockStateManager implements StateManager { private volatile boolean failToGetClusterState = false; private volatile boolean failToSetClusterState = false; - private void verifyCanSet(final Scope scope) throws IOException { - final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState : failToSetClusterState; - if (failToSet) { - throw new IOException("Unit Test configured to throw IOException if " + scope + " State is set"); - } - } + private final boolean usesLocalState; + private final boolean usesClusterState; - private void verifyCanGet(final Scope scope) throws IOException { - final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState : failToGetClusterState; - if (failToGet) { - throw new IOException("Unit Test configured to throw IOException if " + scope + " State is retrieved"); + public MockStateManager(final Object component) { + final Stateful stateful = component.getClass().getAnnotation(Stateful.class); + if (stateful == null) { + usesLocalState = false; + usesClusterState = false; + } else { + final Scope[] scopes = stateful.scopes(); + boolean local = false; + boolean cluster = false; + + for (final Scope scope : scopes) { + if (scope == Scope.LOCAL) { + local = true; + } else if (scope == Scope.CLUSTER) { + cluster = true; + } + } + + usesLocalState = local; + usesClusterState = cluster; } } @Override public synchronized void setState(final Map<String, String> state, final Scope scope) throws IOException { + verifyAnnotation(scope); verifyCanSet(scope); final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet()); @@ -67,11 +81,13 @@ public class MockStateManager implements StateManager { @Override public synchronized StateMap getState(final Scope scope) throws IOException { + verifyAnnotation(scope); verifyCanGet(scope); return retrieveState(scope); } private synchronized StateMap retrieveState(final Scope scope) { + verifyAnnotation(scope); if (scope == Scope.CLUSTER) { return clusterStateMap; } else { @@ -81,6 +97,7 @@ public class MockStateManager implements StateManager { @Override public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException { + verifyAnnotation(scope); if (scope == Scope.CLUSTER) { if (oldValue == clusterStateMap) { verifyCanSet(scope); @@ -102,9 +119,31 @@ public class MockStateManager implements StateManager { @Override public synchronized void clear(final Scope scope) throws IOException { + verifyAnnotation(scope); setState(Collections.<String, String> emptyMap(), scope); } + private void verifyCanSet(final Scope scope) throws IOException { + final boolean failToSet = (scope == Scope.LOCAL) ? failToSetLocalState : failToSetClusterState; + if (failToSet) { + throw new IOException("Unit Test configured to throw IOException if " + scope + " State is set"); + } + } + + private void verifyCanGet(final Scope scope) throws IOException { + final boolean failToGet = (scope == Scope.LOCAL) ? failToGetLocalState : failToGetClusterState; + if (failToGet) { + throw new IOException("Unit Test configured to throw IOException if " + scope + " State is retrieved"); + } + } + + private void verifyAnnotation(final Scope scope) { + // ensure that the @Stateful annotation is present with the appropriate Scope + if ((scope == Scope.LOCAL && !usesLocalState) || (scope == Scope.CLUSTER && !usesClusterState)) { + Assert.fail("Component is attempting to set or retrieve state with a scope of " + scope + " but does not declare that it will use " + + scope + " state. A @Stateful annotation should be added to the component with a scope of " + scope); + } + } private String getValue(final String key, final Scope scope) { final StateMap stateMap; http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java index 6a6e4cf..754bec0 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java @@ -30,7 +30,7 @@ public class MockControllerServiceInitializationContext extends MockControllerSe private final StateManager stateManager; public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) { - this(controllerService, identifier, new MockStateManager()); + this(controllerService, identifier, new MockStateManager(controllerService)); } public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final StateManager stateManager) { http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index c641d24..02a1d8a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -60,7 +60,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private volatile Set<Relationship> unavailableRelationships = new HashSet<>(); public MockProcessContext(final ConfigurableComponent component) { - this(component, new MockStateManager()); + this(component, new MockStateManager(component)); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 8220632..ddc1e71 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -57,6 +57,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.queue.QueueSize; @@ -81,7 +82,8 @@ public class StandardProcessorTestRunner implements TestRunner { private final SharedSessionState sharedState; private final AtomicLong idGenerator; private final boolean triggerSerially; - private final MockStateManager stateManager; + private final MockStateManager processorStateManager; + private final Map<String, MockStateManager> controllerServiceStateManagers = new HashMap<>(); private int numThreads = 1; private final AtomicInteger invocations = new AtomicInteger(0); @@ -102,8 +104,8 @@ public class StandardProcessorTestRunner implements TestRunner { this.sharedState = new SharedSessionState(processor, idGenerator); this.flowFileQueue = sharedState.getFlowFileQueue(); this.sessionFactory = new MockSessionFactory(sharedState, processor); - this.stateManager = new MockStateManager(); - this.context = new MockProcessContext(processor, stateManager); + this.processorStateManager = new MockStateManager(processor); + this.context = new MockProcessContext(processor, processorStateManager); detectDeprecatedAnnotations(processor); @@ -578,7 +580,9 @@ public class StandardProcessorTestRunner implements TestRunner { // } final ComponentLog logger = new MockProcessorLog(identifier, service); - final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger, stateManager); + final MockStateManager serviceStateManager = new MockStateManager(service); + controllerServiceStateManagers.put(identifier, serviceStateManager); + final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger, serviceStateManager); initContext.addControllerServices(context); service.initialize(initContext); @@ -598,7 +602,12 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void assertNotValid(final ControllerService service) { - final ValidationContext validationContext = new MockValidationContext(context, stateManager).getControllerServiceValidationContext(service); + final StateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier()); + if (serviceStateManager == null) { + throw new IllegalStateException("Controller Service has not been added to this TestRunner via the #addControllerService method"); + } + + final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager).getControllerServiceValidationContext(service); final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); for (final ValidationResult result : results) { @@ -612,7 +621,12 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void assertValid(final ControllerService service) { - final ValidationContext validationContext = new MockValidationContext(context, stateManager).getControllerServiceValidationContext(service); + final StateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier()); + if (serviceStateManager == null) { + throw new IllegalStateException("Controller Service has not been added to this TestRunner via the #addControllerService method"); + } + + final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager).getControllerServiceValidationContext(service); final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); for (final ValidationResult result : results) { @@ -718,11 +732,16 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final String value) { + final MockStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier()); + if (serviceStateManager == null) { + throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method"); + } + final ControllerServiceConfiguration configuration = getConfigToUpdate(service); final Map<PropertyDescriptor, String> curProps = configuration.getProperties(); final Map<PropertyDescriptor, String> updatedProps = new HashMap<>(curProps); - final ValidationContext validationContext = new MockValidationContext(context, stateManager).getControllerServiceValidationContext(service); + final ValidationContext validationContext = new MockValidationContext(context, serviceStateManager).getControllerServiceValidationContext(service); final ValidationResult validationResult = property.validate(value, validationContext); updatedProps.put(property, value); @@ -773,6 +792,17 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public MockStateManager getStateManager() { - return stateManager; + return processorStateManager; + } + + /** + * Returns the State Manager for the given Controller Service. + * + * @param controllerService + * @return + */ + @Override + public MockStateManager getStateManager(final ControllerService controllerService) { + return controllerServiceStateManagers.get(controllerService.getIdentifier()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 378a92e..9e7082c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -827,7 +827,13 @@ public interface TestRunner { void clearProvenanceEvents(); /** - * @return the State Provider that is used to stored and retrieve local state + * @return the State Manager that is used to stored and retrieve state */ MockStateManager getStateManager(); + + /** + * @param service the controller service of interest + * @return the State Manager that is used to store and retrieve state for the given controller service + */ + MockStateManager getStateManager(ControllerService service); } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java index 92091da..6e1311a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java @@ -64,4 +64,16 @@ public interface LogRepository { * Removes all LogObservers from this Repository */ void removeAllObservers(); + + /** + * Sets the current logger for the component + * + * @param logger the logger to use + */ + void setLogger(ComponentLog logger); + + /** + * @return the current logger for the component + */ + ComponentLog getLogger(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 638aa08..8f3af55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -936,6 +936,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessorLog processorLogger = new SimpleProcessLogger(identifier, processor); final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this); processor.initialize(ctx); + + LogRepositoryFactory.getRepository(identifier).setLogger(processorLogger); return processor; } catch (final Throwable t) { throw new ProcessorInstantiationException(type, t); @@ -1136,6 +1138,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R clusterTaskExecutor.shutdown(); + if (zooKeeperStateServer != null) { + zooKeeperStateServer.shutdown(); + } + // Trigger any processors' methods marked with @OnShutdown to be called rootGroup.shutdown(); http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java index e83bbac..639f8a2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java @@ -24,6 +24,10 @@ import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.LogRepository; +import org.apache.nifi.logging.LogRepositoryFactory; +import org.apache.nifi.processor.SimpleProcessLogger; public class StandardStateManager implements StateManager { private final StateProvider localProvider; @@ -44,25 +48,40 @@ public class StandardStateManager implements StateManager { return clusterProvider; } + private ComponentLog getLogger(final String componentId) { + final LogRepository repo = LogRepositoryFactory.getRepository(componentId); + final ComponentLog logger = (repo == null) ? null : repo.getLogger(); + if (repo == null || logger == null) { + return new SimpleProcessLogger(componentId, this); + } + + return logger; + } @Override public StateMap getState(final Scope scope) throws IOException { - return getProvider(scope).getState(componentId); + final StateMap stateMap = getProvider(scope).getState(componentId); + getLogger(componentId).debug("Returning {} State: {}", new Object[] {scope, stateMap}); + return stateMap; } @Override public boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException { - return getProvider(scope).replace(oldValue, newValue, componentId); + final boolean replaced = getProvider(scope).replace(oldValue, newValue, componentId); + getLogger(componentId).debug("{} State from old value {} to new value {} was {}", new Object[] {scope, oldValue, newValue, replaced}); + return replaced; } @Override public void setState(final Map<String, String> state, final Scope scope) throws IOException { + getLogger(componentId).debug("Setting {} State to {}", new Object[] {scope, state}); getProvider(scope).setState(state, componentId); } @Override public void clear(final Scope scope) throws IOException { + getLogger(componentId).debug("Clearing {} State", new Object[] {scope}); getProvider(scope).clear(componentId); } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java index b006ac6..672fd6f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java @@ -46,4 +46,8 @@ public class StandardStateMap implements StateMap { return stateValues; } + @Override + public String toString() { + return "StandardStateMap[version=" + version + ", values=" + stateValues + "]"; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java index f8d37e5..f6c55b6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/repository/StandardLogRepository.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogMessage; import org.apache.nifi.logging.LogObserver; @@ -45,6 +46,8 @@ public class StandardLogRepository implements LogRepository { private final Logger logger = LoggerFactory.getLogger(StandardLogRepository.class); + private volatile ComponentLog componentLogger; + @Override public void addLogMessage(final LogLevel level, final String message) { addLogMessage(level, message, (Throwable) null); @@ -170,4 +173,14 @@ public class StandardLogRepository implements LogRepository { writeLock.unlock(); } } + + @Override + public void setLogger(final ComponentLog componentLogger) { + this.componentLogger = componentLogger; + } + + @Override + public ComponentLog getLogger() { + return componentLogger; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 44e4a03..3645eec 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -85,6 +86,10 @@ import org.codehaus.jackson.map.ObjectMapper; @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, " + "3 for the group, and 3 for other users. For example rw-rw-r--") }) +@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the timestamp of the newest file is stored, " + + "along with the filenames of all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after " + + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary " + + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) public class ListHDFS extends AbstractHadoopProcessor { public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index e27e5fe..c9bf369 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; @@ -140,6 +141,8 @@ import org.codehaus.jackson.map.ObjectMapper; * </ul> */ @TriggerSerially +@Stateful(scopes={Scope.LOCAL, Scope.CLUSTER}, description="After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state, " + + "along with all resources that have that same timestmap so that the Processor can avoid data duplication. The scope used depends on the implementation.") public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor { public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() .name("Distributed Cache Service") http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index d3eb515..48f22c5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@ -65,6 +65,7 @@ import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.ssl.SSLContextBuilder; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -93,11 +94,13 @@ import org.apache.nifi.util.StopWatch; @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Fetches a file via HTTP") +@CapabilityDescription("Fetches a file via HTTP. If the HTTP server supports it, the Processor then stores the Last Modified time and the ETag " + + "so that data will not be pulled again until the remote data changes or until the state is cleared.") @WritesAttributes({ @WritesAttribute(attribute = "filename", description = "The filename is set to the name of the file on the remote server"), @WritesAttribute(attribute = "mime.type", description = "The MIME Type of the FlowFile, as reported by the HTTP Content-Type header") }) +@Stateful(scopes = {Scope.LOCAL}, description = "Stores Last Modified Time and ETag headers returned by server so that the same data will not be fetched multiple times.") public class GetHTTP extends AbstractSessionFactoryProcessor { static final int PERSISTENCE_INTERVAL_MSEC = 10000; http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java index ed661ae..e82be2c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -94,6 +95,10 @@ import org.apache.nifi.processors.standard.util.FileInfo; "rw-rw-r--") }) @SeeAlso({GetFile.class, PutFile.class, FetchFile.class}) +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored, " + + "along with the filenames of all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after " + + "this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the " + + "<Input Directory Location> property.") public class ListFile extends AbstractListProcessor<FileInfo> { static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Input Directory is located on a local disk. State will be stored locally on each node in the cluster."); static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", "Remote", "Input Directory is located on a remote system. State will be stored across the cluster so that " http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java index e1cd417..d92f398 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -50,6 +51,10 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer; @WritesAttribute(attribute = "filename", description = "The name of the file on the SFTP Server"), @WritesAttribute(attribute = "path", description = "The fully qualified name of the directory on the SFTP Server from which the file was pulled"), }) +@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of files, the timestamp of the newest file is stored, " + + "along with the filename all files that share that same timestamp. This allows the Processor to list only files that have been added or modified after " + + "this date the next time that the Processor is run. State is stored across the cluster so that this Processor can be run on Primary Node only and if " + + "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.") public class ListSFTP extends ListFileTransfer { @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 7343eea..6f99b42 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -46,6 +46,7 @@ import java.util.zip.Checksum; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -79,6 +80,8 @@ import org.apache.nifi.util.LongHolder; + "was not running (provided that the data still exists upon restart of NiFi). It is generally advisable to set the Run Schedule to a few seconds, rather than running " + "with the default value of 0 secs, as this Processor will consume a lot of resources if scheduled very aggressively. At this time, this Processor does not support " + "ingesting files that have been compressed when 'rolled over'.") +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state about where in the Tailed File it left off so that on restart it does not have to duplicate data. " + + "State is stored either local or clustered depend on the <File Location> property.") public class TailFile extends AbstractProcessor { static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "File is located on a local disk drive. Each node in a cluster will tail a different file."); http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index e09d6f0..8bec03e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -106,7 +106,7 @@ public class TestDetectDuplicate { final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); final ComponentLog logger = new MockProcessorLog("client", client); - final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager()); + final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client)); client.initialize(clientInitContext); return client; http://git-wip-us.apache.org/repos/asf/nifi/blob/774c29a4/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java index 3b70dc4..66dc854 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java @@ -39,7 +39,7 @@ public class TestRouteOnAttribute { @Test public void testInvalidOnMisconfiguredProperty() { final RouteOnAttribute proc = new RouteOnAttribute(); - final MockProcessContext ctx = new MockProcessContext(proc, new MockStateManager()); + final MockProcessContext ctx = new MockProcessContext(proc, new MockStateManager(proc)); final ValidationResult validationResult = ctx.setProperty("RouteA", "${a:equals('b')"); // Missing closing brace assertFalse(validationResult.isValid()); } @@ -47,7 +47,7 @@ public class TestRouteOnAttribute { @Test public void testInvalidOnNonBooleanProperty() { final RouteOnAttribute proc = new RouteOnAttribute(); - final MockProcessContext ctx = new MockProcessContext(proc, new MockStateManager()); + final MockProcessContext ctx = new MockProcessContext(proc, new MockStateManager(proc)); final ValidationResult validationResult = ctx.setProperty("RouteA", "${a:length()"); // Should be boolean assertFalse(validationResult.isValid()); }