This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e302f2aff7 NIFI-10976 Added Previous Cluster State Provider
configuration (#7235)
e302f2aff7 is described below
commit e302f2aff7af6ec1165e8fa739eaa63dc7e20699
Author: exceptionfactory <[email protected]>
AuthorDate: Wed May 10 15:22:05 2023 -0500
NIFI-10976 Added Previous Cluster State Provider configuration (#7235)
- Added methods to enumerate Stored Component Identifiers on State Provider
interface and implementations
- Added nifi.state.management.provider.cluster.previous to nifi.properties
- Updated State Manager Provider to restore Cluster State from Previous
Cluster Provider
- Updated Configuring State Providers documentation for new property
---
.../java/org/apache/nifi/util/NiFiProperties.java | 1 +
.../src/main/asciidoc/administration-guide.adoc | 29 ++-
.../nifi/components/state/StateProvider.java | 21 ++
.../manager/StandardStateManagerProvider.java | 256 +++++++++++++++------
.../local/WriteAheadLocalStateProvider.java | 10 +
.../zookeeper/ZooKeeperStateProvider.java | 31 ++-
.../state/providers/AbstractTestStateProvider.java | 10 +
.../provider/KubernetesConfigMapStateProvider.java | 36 +++
.../KubernetesConfigMapStateProviderTest.java | 20 ++
.../nifi-framework/nifi-resources/pom.xml | 1 +
.../src/main/resources/conf/nifi.properties | 2 +
.../nifi/redis/state/RedisStateProvider.java | 38 +++
.../nifi/redis/state/ITRedisStateProvider.java | 23 +-
.../components/state/HashMapStateProvider.java | 10 +
14 files changed, 404 insertions(+), 84 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 681ed0a5ee..65615f9520 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -306,6 +306,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String STATE_MANAGEMENT_CONFIG_FILE =
"nifi.state.management.configuration.file";
public static final String STATE_MANAGEMENT_LOCAL_PROVIDER_ID =
"nifi.state.management.provider.local";
public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_ID =
"nifi.state.management.provider.cluster";
+ public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_PREVIOUS_ID =
"nifi.state.management.provider.cluster.previous";
public static final String STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER =
"nifi.state.management.embedded.zookeeper.start";
public static final String STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES =
"nifi.state.management.embedded.zookeeper.properties";
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 838fe75d7d..15d01f49fd 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2619,15 +2619,32 @@ in the cluster. This allows one node to pick up where
another node left off, or
[[state_providers]]
=== Configuring State Providers
-When a component decides to store or retrieve state, it does so by providing a
"Scope" - either Node-local or Cluster-wide. The
-mechanism that is used to store and retrieve this state is then determined
based on this Scope, as well as the configured State
-Providers. The _nifi.properties_ file contains three different properties that
are relevant to configuring these State Providers.
+When a component decides to store or retrieve state, it does so by providing a
`Scope`, either `Local` to the node or
+applicable to the entire `Cluster`. Component implementation code and
configuration properties determine the requested
+Scope, which the framework provides according to the State Management
configuration. The `nifi.properties` configuration
+contains several properties for managing these State Providers.
|====
|*Property*|*Description*
-|`nifi.state.management.configuration.file`|The first is the property that
specifies an external XML file that is used for configuring the local and/or
cluster-wide State Providers. This XML file may contain configurations for
multiple providers
-|`nifi.state.management.provider.local`|The property that provides the
identifier of the local State Provider configured in this XML file
-|`nifi.state.management.provider.cluster`|Similarly, the property provides the
identifier of the cluster-wide State Provider configured in this XML file.
+|`nifi.state.management.configuration.file`|The configuration file specifies
the path to an external XML file that the
+framework uses to configure State Providers. This XML file may contain
configurations for multiple providers.
+|`nifi.state.management.provider.local`|The Local Provider stores current
Local State information. The property value
+identifies a Local Provider in the State Management configuration that the
framework will use for storing and retrieving
+Local State for requesting components.
+|`nifi.state.management.provider.cluster`|The Cluster Provider stores current
Cluster State information. The property
+value identifies a Cluster Provider in the State Management configuration that
the framework will use for storing and
+retrieving Cluster State for requesting components.
+|`nifi.state.management.provider.cluster.previous`|The Previous Cluster State
Provider enables population of the current
+Cluster State from an existing Provider. The property value identifies a
Cluster Provider in the State Management
+configuration that the framework will use as the initial source of Cluster
State when the current Cluster State Provider
+is has no information stored.
+
+The framework enumerates the Current Cluster Provider when a node becomes
Primary, and proceeds to check the Previous
+Cluster Provider when the Current Cluster Provider does not contain any
component information. The Previous Cluster
+Provider property value can be set to blank after cluster startup following a
successful Cluster State restore from
+backup.
+
+The default value is blank.
|====
This XML file consists of a top-level `state-management` element, which has
one or more `local-provider` and zero or more `cluster-provider`
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
index e1e4352721..74f7d972ec 100644
---
a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
@@ -18,6 +18,8 @@
package org.apache.nifi.components.state;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import org.apache.nifi.components.ConfigurableComponent;
@@ -130,4 +132,23 @@ public interface StateProvider extends
ConfigurableComponent {
* @return the {@link Scope}s supported by the configuration
*/
Scope[] getSupportedScopes();
+
+ /**
+ * Indicates whether the State Provider supports enumerating component
identifiers with stored state information
+ *
+ * @return Component enumeration supported status
+ */
+ default boolean isComponentEnumerationSupported() {
+ return false;
+ }
+
+ /**
+ * Get Component Identifiers with associated state stored in the Provider
+ *
+ * @return Collection of Component Identifiers with stored state defaults
to empty
+ * @throws IOException Thrown on failures to retrieve component identifiers
+ */
+ default Collection<String> getStoredComponentIds() throws IOException {
+ return Collections.emptyList();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index 35c3180551..49eb1e98b5 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -64,12 +64,15 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -82,10 +85,18 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
private final ConcurrentMap<String, StateManager> stateManagers = new
ConcurrentHashMap<>();
private final StateProvider localStateProvider;
private final StateProvider clusterStateProvider;
+ private final StateProvider previousClusterStateProvider;
public StandardStateManagerProvider(final StateProvider
localStateProvider, final StateProvider clusterStateProvider) {
this.localStateProvider = localStateProvider;
this.clusterStateProvider = clusterStateProvider;
+ this.previousClusterStateProvider = null;
+ }
+
+ private StandardStateManagerProvider(final StateProvider
localStateProvider, final StateProvider clusterStateProvider, final
StateProvider previousClusterStateProvider) {
+ this.localStateProvider = localStateProvider;
+ this.clusterStateProvider = clusterStateProvider;
+ this.previousClusterStateProvider = previousClusterStateProvider;
}
protected StateProvider getLocalStateProvider() {
@@ -104,16 +115,21 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
return provider;
}
- final StateProvider localProvider =
createLocalStateProvider(properties,variableRegistry, extensionManager,
parameterLookup);
+ final SSLContext sslContext = createSslContext(properties);
+
+ final StateProvider localProvider =
createLocalStateProvider(properties, sslContext, variableRegistry,
extensionManager, parameterLookup);
final StateProvider clusterProvider;
+ final StateProvider previousClusterProvider;
if (properties.isNode()) {
- clusterProvider =
createClusteredStateProvider(properties,variableRegistry, extensionManager,
parameterLookup);
+ clusterProvider = createClusteredStateProvider(properties,
sslContext, variableRegistry, extensionManager, parameterLookup);
+ previousClusterProvider =
getPreviousClusteredStateProvider(properties, sslContext, variableRegistry,
extensionManager, parameterLookup);
} else {
clusterProvider = null;
+ previousClusterProvider = null;
}
- provider = new StandardStateManagerProvider(localProvider,
clusterProvider);
+ provider = new StandardStateManagerProvider(localProvider,
clusterProvider, previousClusterProvider);
return provider;
}
@@ -121,105 +137,177 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
provider = null;
}
- private static StateProvider createLocalStateProvider(final NiFiProperties
properties, final VariableRegistry variableRegistry, final ExtensionManager
extensionManager,
- final
ParameterLookup parameterLookup) throws IOException, ConfigParseException {
+ private static SSLContext createSslContext(final NiFiProperties
properties) {
+ final TlsConfiguration standardTlsConfiguration =
StandardTlsConfiguration.fromNiFiProperties(properties);
+ try {
+ return
SslContextFactory.createSslContext(standardTlsConfiguration);
+ } catch (final TlsException e) {
+ throw new IllegalStateException("TLS Security Properties not valid
for State Manager configuration", e);
+ }
+ }
+
+ private static StateProvider createLocalStateProvider(
+ final NiFiProperties properties,
+ final SSLContext sslContext,
+ final VariableRegistry variableRegistry,
+ final ExtensionManager extensionManager,
+ final ParameterLookup parameterLookup
+ ) throws IOException, ConfigParseException {
final File configFile = properties.getStateManagementConfigFile();
- return createStateProvider(configFile, Scope.LOCAL, properties,
variableRegistry, extensionManager, parameterLookup);
+ final StateProviderConfiguration config =
getProviderConfiguration(Scope.LOCAL, properties.getLocalStateProviderId(),
NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, configFile);
+ return createStateProvider(config, sslContext, variableRegistry,
extensionManager, parameterLookup);
}
- private static StateProvider createClusteredStateProvider(final
NiFiProperties properties, final VariableRegistry variableRegistry, final
ExtensionManager extensionManager,
- final
ParameterLookup parameterLookup) throws IOException, ConfigParseException {
+ private static StateProvider createClusteredStateProvider(
+ final NiFiProperties properties,
+ final SSLContext sslContext,
+ final VariableRegistry variableRegistry,
+ final ExtensionManager extensionManager,
+ final ParameterLookup parameterLookup
+ ) throws IOException, ConfigParseException {
final File configFile = properties.getStateManagementConfigFile();
- return createStateProvider(configFile, Scope.CLUSTER, properties,
variableRegistry, extensionManager, parameterLookup);
+ final StateProviderConfiguration config =
getProviderConfiguration(Scope.CLUSTER, properties.getClusterStateProviderId(),
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_ID, configFile);
+ return createStateProvider(config, sslContext, variableRegistry,
extensionManager, parameterLookup);
}
- private static StateProvider createStateProvider(final File configFile,
final Scope scope, final NiFiProperties properties, final VariableRegistry
variableRegistry,
- final ExtensionManager
extensionManager, final ParameterLookup parameterLookup) throws
ConfigParseException, IOException {
- final String providerId;
- final String providerIdPropertyName;
- final String providerDescription;
- final String providerXmlElementName;
- final String oppositeScopeXmlElementName;
-
- switch (scope) {
- case CLUSTER:
- providerId = properties.getClusterStateProviderId();
- providerIdPropertyName =
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_ID;
- providerDescription = "Cluster State Provider";
- providerXmlElementName = "cluster-provider";
- oppositeScopeXmlElementName = "local-provider";
- break;
- case LOCAL:
- providerId = properties.getLocalStateProviderId();
- providerIdPropertyName =
NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID;
- providerDescription = "Local State Provider";
- providerXmlElementName = "local-provider";
- oppositeScopeXmlElementName = "cluster-provider";
- break;
- default:
- throw new AssertionError("Attempted to create State Provider
for unknown Scope: " + scope);
+ private static StateProvider getPreviousClusteredStateProvider(
+ final NiFiProperties properties,
+ final SSLContext sslContext,
+ final VariableRegistry variableRegistry,
+ final ExtensionManager extensionManager,
+ final ParameterLookup parameterLookup
+ ) throws IOException {
+ final String clusterProviderPreviousId =
properties.getProperty(NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_PREVIOUS_ID);
+ if (clusterProviderPreviousId == null ||
clusterProviderPreviousId.isEmpty()) {
+ return null;
+ } else {
+ final File configFile = properties.getStateManagementConfigFile();
+ final StateProviderConfiguration config =
getProviderConfiguration(Scope.CLUSTER, clusterProviderPreviousId,
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_PREVIOUS_ID, configFile);
+ final StateProvider previousClusterStateProvider =
createStateProvider(config, sslContext, variableRegistry, extensionManager,
parameterLookup);
+ if
(previousClusterStateProvider.isComponentEnumerationSupported()) {
+ return previousClusterStateProvider;
+ } else {
+ throw new IllegalStateException(String.format("Previous
Cluster State Provider [%s] does not support Component Enumeration",
clusterProviderPreviousId));
+ }
}
+ }
+
+ private static void loadPreviousClusterState(final StateProvider
previousProvider, final StateProvider currentProvider) {
+ final String previousProviderId = previousProvider.getIdentifier();
+ final String currentProviderId = currentProvider.getIdentifier();
+
+ try {
+ final Collection<String> currentStoredComponentIds =
currentProvider.getStoredComponentIds();
+ if (currentStoredComponentIds.isEmpty()) {
+ final Collection<String> previousStoredComponentIds =
previousProvider.getStoredComponentIds();
+ if (previousStoredComponentIds.isEmpty()) {
+ logger.info("Cluster State not found in Previous Provider
[{}]", previousProviderId);
+ } else {
+ loadPreviousClusterStateComponents(previousProvider,
currentProvider, previousStoredComponentIds);
+ }
+ } else {
+ logger.info("Previous Cluster State ignored: State found in
Provider [{}] for Components [{}]", currentProviderId,
currentStoredComponentIds.size());
+ }
+ } catch (final IOException e) {
+ final String message = String.format("Cluster State Component
Enumeration failed from Provider [%s] to Provider [%s]", previousProviderId,
currentProviderId);
+ throw new UncheckedIOException(message, e);
+ }
+ }
+
+ private static void loadPreviousClusterStateComponents(final StateProvider
previousProvider, final StateProvider currentProvider, final Collection<String>
previousStoredComponentIds) {
+ final String previousProviderId = previousProvider.getIdentifier();
+ final String currentProviderId = currentProvider.getIdentifier();
+ final Set<String> loadedComponentIds = new LinkedHashSet<>();
+
+ logger.info("Cluster State found in Previous Provider [{}] for
Components [{}]", previousProviderId, previousStoredComponentIds.size());
+ try {
+ for (final String componentId : previousStoredComponentIds) {
+ final StateMap previousState =
previousProvider.getState(componentId);
+ final Map<String, String> state = previousState.toMap();
+ currentProvider.setState(state, componentId);
+ logger.info("Cluster State loaded for Component [{}] to
Provider [{}]", componentId, currentProviderId);
+ loadedComponentIds.add(componentId);
+ }
+ logger.info("Cluster State loaded from Provider [{}] to Provider
[{}] for Components [{}]", previousProviderId, currentProviderId,
previousStoredComponentIds.size());
+ } catch (final IOException e) {
+ final Set<String> failedComponentIds = new
LinkedHashSet<>(previousStoredComponentIds);
+ failedComponentIds.removeAll(loadedComponentIds);
+
+ logger.warn("Cluster State loaded for Components {} but failed for
Components [{}] from Provider [{}] to Provider [{}]",
+ loadedComponentIds, failedComponentIds,
previousProviderId, currentProviderId);
+
+ final String message = String.format("Cluster State load failed
from Provider [%s] to Provider [%s]", previousProviderId, currentProviderId);
+ throw new UncheckedIOException(message, e);
+ }
+ }
+
+ private static StateProviderConfiguration getProviderConfiguration(
+ final Scope scope,
+ final String providerId,
+ final String providerIdPropertyName,
+ final File configFile
+ ) throws IOException {
if (!configFile.exists()) {
- throw new IllegalStateException("Cannot create " +
providerDescription + " because the State Management Configuration File " +
configFile + " does not exist");
+ throw new IllegalStateException("Cannot create " + scope + "
Provider because the State Management Configuration File " + configFile + "
does not exist");
}
if (!configFile.canRead()) {
- throw new IllegalStateException("Cannot create " +
providerDescription + " because the State Management Configuration File " +
configFile + " cannot be read");
+ throw new IllegalStateException("Cannot create " + scope + "
Provider because the State Management Configuration File " + configFile + "
cannot be read");
}
if (providerId == null) {
if (scope == Scope.CLUSTER) {
throw new IllegalStateException("Cannot create Cluster State
Provider because the '" + providerIdPropertyName
- + "' property is missing from the NiFi Properties file. In
order to run NiFi in a cluster, the " + providerIdPropertyName
- + " property must be configured in nifi.properties");
+ + "' property is missing from the NiFi Properties
file. In order to run NiFi in a cluster, the " + providerIdPropertyName
+ + " property must be configured in nifi.properties");
}
- throw new IllegalStateException("Cannot create " +
providerDescription + " because the '" + providerIdPropertyName
- + "' property is missing from the NiFi Properties file");
+ throw new IllegalStateException("Cannot create " + scope + "
Provider because the '" + providerIdPropertyName
+ + "' property is missing from the NiFi Properties file");
}
if (providerId.trim().isEmpty()) {
- throw new IllegalStateException("Cannot create " +
providerDescription + " because the '" + providerIdPropertyName
- + "' property in the NiFi Properties file has no value set.
This is a required property and must reference the identifier of one of the "
- + providerXmlElementName + " elements in the State Management
Configuration File (" + configFile + ")");
+ throw new IllegalStateException("Cannot create " + scope + "
Provider because the '" + providerIdPropertyName
+ + "' property in the NiFi Properties file has no value
set. This is a required property and must reference the identifier of one of
the "
+ + scope + " elements in the State Management Configuration
File (" + configFile + ")");
}
final StateManagerConfiguration config =
StateManagerConfiguration.parse(configFile);
final StateProviderConfiguration providerConfig =
config.getStateProviderConfiguration(providerId);
if (providerConfig == null) {
- throw new IllegalStateException("Cannot create " +
providerDescription + " because the '" + providerIdPropertyName
- + "' property in the NiFi Properties file is set to '" +
providerId + "', but there is no " + providerXmlElementName
- + " entry in the State Management Configuration File (" +
configFile + ") with this id");
+ throw new IllegalStateException("Cannot create " + scope + "
Provider because the '" + providerIdPropertyName
+ + "' property in the NiFi Properties file is set to '" +
providerId + "', but there is no " + scope
+ + " entry in the State Management Configuration File (" +
configFile + ") with this id");
}
if (providerConfig.getScope() != scope) {
- throw new IllegalStateException("Cannot create " +
providerDescription + " because the '" + providerIdPropertyName
- + "' property in the NiFi Properties file is set to '" +
providerId + "', but this id is assigned to a " + oppositeScopeXmlElementName
- + " entry in the State Management Configuration File (" +
configFile + "), rather than a " + providerXmlElementName + " entry");
+ throw new IllegalStateException("Cannot create " + scope + "
Provider because the '" + providerIdPropertyName
+ + "' property in the NiFi Properties file is set to '" +
providerId + "', but this ID is assigned to another "
+ + " entry in the State Management Configuration File (" +
configFile + "), rather than a " + scope + " entry");
}
+ return providerConfig;
+ }
+
+ private static StateProvider createStateProvider(
+ final StateProviderConfiguration providerConfig,
+ final SSLContext sslContext,
+ final VariableRegistry variableRegistry,
+ final ExtensionManager extensionManager,
+ final ParameterLookup parameterLookup
+ ) throws IOException {
final String providerClassName = providerConfig.getClassName();
final StateProvider provider;
try {
provider = instantiateStateProvider(extensionManager,
providerClassName);
} catch (final Exception e) {
- throw new RuntimeException("Cannot create " + providerDescription
+ " of type " + providerClassName, e);
+ throw new RuntimeException("Cannot create " +
providerConfig.getScope() + " Provider of type " + providerClassName, e);
}
- if (!ArrayUtils.contains(provider.getSupportedScopes(), scope)) {
- throw new RuntimeException("Cannot use " + providerDescription + "
("+providerClassName+") as it only supports scope(s) " +
ArrayUtils.toString(provider.getSupportedScopes()) + " but " +
- "instance"
- + " is configured to use scope " + scope);
- }
-
- final SSLContext sslContext;
- TlsConfiguration standardTlsConfiguration =
StandardTlsConfiguration.fromNiFiProperties(properties);
- try {
- sslContext =
SslContextFactory.createSslContext(standardTlsConfiguration);
- } catch (TlsException e) {
- logger.error("Encountered an error configuring TLS for state
manager: ", e);
- throw new IllegalStateException("Error configuring TLS for state
manager", e);
+ if (!ArrayUtils.contains(provider.getSupportedScopes(),
providerConfig.getScope())) {
+ throw new RuntimeException("Cannot use " +
providerConfig.getScope() + " (" + providerClassName + ") as it only supports
scope(s) "
+ + ArrayUtils.toString(provider.getSupportedScopes()) + "
but instance is configured to use scope " + providerConfig.getScope());
}
//create variable registry
@@ -253,8 +341,8 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
propertyMap.put(descriptor, new
StandardPropertyValue(resourceContext, entry.getValue(),null, parameterLookup,
variableRegistry));
}
- final ComponentLog logger = new SimpleProcessLogger(providerId,
provider);
- final StateProviderInitializationContext initContext = new
StandardStateProviderInitializationContext(providerId, propertyMap, sslContext,
logger);
+ final ComponentLog logger = new
SimpleProcessLogger(providerConfig.getId(), provider);
+ final StateProviderInitializationContext initContext = new
StandardStateProviderInitializationContext(providerConfig.getId(), propertyMap,
sslContext, logger);
synchronized (provider) {
provider.initialize(initContext);
@@ -267,22 +355,22 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
int invalidCount = 0;
for (final ValidationResult result : results) {
if (!result.isValid()) {
- validationFailures.append(result.toString()).append("\n");
+ validationFailures.append(result).append("\n");
invalidCount++;
}
}
if (invalidCount > 0) {
- throw new IllegalStateException("Could not initialize State
Providers because the " + providerDescription + " is not valid. The following "
- + invalidCount + " Validation Errors occurred:\n" +
validationFailures.toString() + "\nPlease check the configuration of the " +
providerDescription + " with ID ["
- + providerId.trim() + "] in the file " +
configFile.getAbsolutePath());
+ throw new IllegalStateException("Could not initialize State
Providers because the " + providerConfig.getScope() + " Provider is not valid.
The following "
+ + invalidCount + " Validation Errors occurred:\n" +
validationFailures + "\nPlease check the configuration of the " +
providerConfig.getScope() + " Provider with ID ["
+ + providerConfig.getId() + "]");
}
return provider;
}
// Inject NiFi Properties to state providers that use the
StateProviderContext annotation
- private static void performMethodInjection(final Object instance, final
Class stateProviderClass) throws IllegalAccessException,
IllegalArgumentException, InvocationTargetException {
+ private static void performMethodInjection(final Object instance, final
Class<?> stateProviderClass) throws IllegalAccessException,
IllegalArgumentException, InvocationTargetException {
for (final Method method : stateProviderClass.getMethods()) {
if (method.isAnnotationPresent(StateProviderContext.class)) {
// make the method accessible
@@ -308,13 +396,14 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
}
}
- final Class parentClass = stateProviderClass.getSuperclass();
+ final Class<?> parentClass = stateProviderClass.getSuperclass();
if (parentClass != null &&
StateProvider.class.isAssignableFrom(parentClass)) {
performMethodInjection(instance, parentClass);
}
}
- private static StateProvider instantiateStateProvider(final
ExtensionManager extensionManager, final String type) throws
ClassNotFoundException, InstantiationException, IllegalAccessException {
+ private static StateProvider instantiateStateProvider(final
ExtensionManager extensionManager, final String type)
+ throws ClassNotFoundException, InstantiationException,
IllegalAccessException, NoSuchMethodException, InvocationTargetException {
final ClassLoader ctxClassLoader =
Thread.currentThread().getContextClassLoader();
try {
final List<Bundle> bundles = extensionManager.getBundles(type);
@@ -331,7 +420,7 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
final Class<? extends StateProvider> mgrClass =
rawClass.asSubclass(StateProvider.class);
- StateProvider provider = mgrClass.newInstance();
+ StateProvider provider =
mgrClass.getDeclaredConstructor().newInstance();
try {
performMethodInjection(provider, mgrClass);
} catch (InvocationTargetException e) {
@@ -465,6 +554,20 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
return stateProvider.getIdentifier();
}
}
+
+ @Override
+ public boolean isComponentEnumerationSupported() {
+ try (final NarCloseable narCloseable =
NarCloseable.withNarLoader()) {
+ return stateProvider.isComponentEnumerationSupported();
+ }
+ }
+
+ @Override
+ public Collection<String> getStoredComponentIds() throws
IOException {
+ try (final NarCloseable narCloseable =
NarCloseable.withNarLoader()) {
+ return stateProvider.getStoredComponentIds();
+ }
+ }
};
}
@@ -491,11 +594,18 @@ public class StandardStateManagerProvider implements
StateManagerProvider {
if (clusterStateProvider != null) {
clusterStateProvider.shutdown();
}
+ if (previousClusterStateProvider != null) {
+ previousClusterStateProvider.shutdown();
+ }
}
@Override
public void enableClusterProvider() {
clusterStateProvider.enable();
+ if (previousClusterStateProvider != null) {
+ previousClusterStateProvider.enable();
+ loadPreviousClusterState(previousClusterStateProvider,
clusterStateProvider);
+ }
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
index 7f79649656..682fc5a2d7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -227,6 +227,16 @@ public class WriteAheadLocalStateProvider extends
AbstractStateProvider {
return new Scope[]{Scope.LOCAL};
}
+ @Override
+ public boolean isComponentEnumerationSupported() {
+ return true;
+ }
+
+ @Override
+ public Collection<String> getStoredComponentIds() {
+ return Collections.unmodifiableCollection(componentProviders.keySet());
+ }
+
private static class ComponentProvider {
private final AtomicLong versionGenerator;
private final WriteAheadRepository<StateMapUpdate> wal;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index a2d7b96798..9f02b6b88a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -57,6 +57,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -75,6 +76,10 @@ import java.util.stream.Collectors;
public class ZooKeeperStateProvider extends AbstractStateProvider {
private static final int EMPTY_VERSION = -1;
+ private static final String COMPONENTS_RELATIVE_PATH = "/components";
+
+ private static final String COMPONENTS_PATH_FORMAT = "%s%s/%s";
+
private static final Logger logger =
LoggerFactory.getLogger(ZooKeeperStateProvider.class);
private NiFiProperties nifiProperties;
@@ -259,7 +264,7 @@ public class ZooKeeperStateProvider extends
AbstractStateProvider {
}
private String getComponentPath(final String componentId) {
- return rootNode + "/components/" + componentId;
+ return String.format(COMPONENTS_PATH_FORMAT, rootNode,
COMPONENTS_RELATIVE_PATH, componentId);
}
private void verifyEnabled() throws IOException {
@@ -518,6 +523,30 @@ public class ZooKeeperStateProvider extends
AbstractStateProvider {
setState(Collections.emptyMap(), componentId);
}
+ @Override
+ public boolean isComponentEnumerationSupported() {
+ return true;
+ }
+
+ @Override
+ public Collection<String> getStoredComponentIds() throws IOException {
+ try {
+ final ZooKeeper zooKeeper = getZooKeeper();
+ final String componentsPath = String.format("%s%s", rootNode,
COMPONENTS_RELATIVE_PATH);
+ final List<String> children =
zooKeeper.getChildren(componentsPath, false);
+ return Collections.unmodifiableCollection(children);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("ZooKeeper communication interrupted", e);
+ } catch (final KeeperException e) {
+ final Code code = e.code();
+ if (Code.NONODE == code) {
+ return Collections.emptyList();
+ }
+ throw new IOException(String.format("ZooKeeper communication
failed: %s", code), e);
+ }
+ }
+
private void validateDataSize(final ZKClientConfig clientConfig, final
byte[] data, final String componentId, final int totalStateValues) throws
StateTooLargeException {
final int maximumSize = clientConfig.getInt(ZKConfig.JUTE_MAXBUFFER,
NiFiProperties.DEFAULT_ZOOKEEPER_JUTE_MAXBUFFER);
if (data != null && data.length > maximumSize) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
index 43946ec8cf..d07929cff4 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
@@ -23,8 +23,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import org.apache.nifi.components.state.StateMap;
@@ -48,8 +50,16 @@ public abstract class AbstractTestStateProvider {
@Test
public void testSetAndGet() throws IOException {
+ final Collection<String> initialStoredComponentIds =
getProvider().getStoredComponentIds();
+ assertTrue(initialStoredComponentIds.isEmpty());
+
getProvider().setState(Collections.singletonMap("testSetAndGet",
"value"), componentId);
assertEquals("value",
getProvider().getState(componentId).get("testSetAndGet"));
+
+ final Collection<String> storedComponentIds =
getProvider().getStoredComponentIds();
+ final Iterator<String> componentIds = storedComponentIds.iterator();
+ assertTrue(componentIds.hasNext());
+ assertEquals(componentId, componentIds.next());
}
@Test
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
index 9b105598f3..b2b516fe6e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.kubernetes.state.provider;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -37,12 +38,16 @@ import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* State Provider implementation based on Kubernetes ConfigMaps with Base64
encoded keys to meet Kubernetes constraints
@@ -54,6 +59,10 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
+ private static final Pattern CONFIG_MAP_NAME_PATTERN =
Pattern.compile("^nifi-component-(.+)$");
+
+ private static final int COMPONENT_ID_GROUP = 1;
+
/** Encode ConfigMap keys using URL Encoder without padding characters for
compliance with Kubernetes naming */
private static final Base64.Encoder encoder =
Base64.getUrlEncoder().withoutPadding();
@@ -247,6 +256,33 @@ public class KubernetesConfigMapStateProvider extends
AbstractConfigurableCompon
return SUPPORTED_SCOPES;
}
+ /**
+ * Kubernetes ConfigMap Provider supported Component Enumeration
+ *
+ * @return Component Enumeration supported
+ */
+ @Override
+ public boolean isComponentEnumerationSupported() {
+ return true;
+ }
+
+ /**
+ * Get Component Identifiers with stored state based on ConfigMap names
matching standard pattern
+ *
+ * @return Component Identifiers with stored state or empty when none found
+ */
+ @Override
+ public Collection<String> getStoredComponentIds() {
+ final ConfigMapList configMapList =
kubernetesClient.configMaps().inNamespace(namespace).list();
+ return configMapList.getItems().stream()
+ .map(ConfigMap::getMetadata)
+ .map(ObjectMeta::getName)
+ .map(CONFIG_MAP_NAME_PATTERN::matcher)
+ .filter(Matcher::matches)
+ .map(matcher -> matcher.group(COMPONENT_ID_GROUP))
+ .collect(Collectors.toUnmodifiableList());
+ }
+
/**
* Get Kubernetes Client using standard configuration
*
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
index 13412a0003..f50834ba67 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
@@ -35,7 +35,9 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
@@ -222,6 +224,24 @@ class KubernetesConfigMapStateProviderTest {
assertEquals(HttpMethod.DELETE.name(), request.getMethod());
}
+ @Test
+ void testSetStateGetStoredComponentIds() throws IOException {
+ setContext();
+ provider.initialize(context);
+
+ final Collection<String> initialStoredComponentIds =
provider.getStoredComponentIds();
+ assertTrue(initialStoredComponentIds.isEmpty());
+
+ final Map<String, String> state =
Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);
+ provider.setState(state, COMPONENT_ID);
+
+ final Collection<String> storedComponentIds =
provider.getStoredComponentIds();
+ final Iterator<String> componentIds = storedComponentIds.iterator();
+
+ assertTrue(componentIds.hasNext());
+ assertEquals(COMPONENT_ID, componentIds.next());
+ }
+
private void setContext() {
when(context.getIdentifier()).thenReturn(IDENTIFIER);
when(context.getLogger()).thenReturn(logger);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index e8700a07f8..66cacca6d1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -53,6 +53,7 @@
<nifi.state.management.embedded.zookeeper.properties>./conf/zookeeper.properties</nifi.state.management.embedded.zookeeper.properties>
<nifi.state.management.provider.local>local-provider</nifi.state.management.provider.local>
<nifi.state.management.provider.cluster>zk-provider</nifi.state.management.provider.cluster>
+ <nifi.state.management.provider.cluster.previous/>
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
<nifi.flowfile.repository.wal.implementation>org.apache.nifi.wali.SequentialAccessWriteAheadLog</nifi.flowfile.repository.wal.implementation>
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 2f6214c519..4126d31efc 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -61,6 +61,8 @@
nifi.state.management.configuration.file=${nifi.state.management.configuration.f
nifi.state.management.provider.local=${nifi.state.management.provider.local}
# The ID of the cluster-wide state provider. This will be ignored if NiFi is
not clustered but must be populated if running in a cluster.
nifi.state.management.provider.cluster=${nifi.state.management.provider.cluster}
+# The Previous Cluster State Provider from which the framework will load
Cluster State when the current Cluster Provider has no entries
+nifi.state.management.provider.cluster.previous=${nifi.state.management.provider.cluster.previous}
# Specifies whether or not this instance of NiFi should run an embedded
ZooKeeper server
nifi.state.management.embedded.zookeeper.start=${nifi.state.management.embedded.zookeeper.start}
# Properties file that provides the ZooKeeper properties to use if
<nifi.state.management.embedded.zookeeper.start> is set to true
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index 72ccac1f7c..3a3c06b512 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -42,6 +42,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* A StateProvider backed by Redis.
@@ -78,6 +82,12 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
}
+ private static final String KEY_PATTERN_FORMAT = "%s*";
+
+ private static final String KEY_PREFIX_COMPONENT_ID_PATTERN = "^%s(.+)$";
+
+ private static final int COMPONENT_ID_GROUP = 1;
+
private String identifier;
private String keyPrefix;
private ComponentLog logger;
@@ -289,6 +299,34 @@ public class RedisStateProvider extends
AbstractConfigurableComponent implements
return new Scope[] {Scope.CLUSTER};
}
+ @Override
+ public boolean isComponentEnumerationSupported() {
+ return true;
+ }
+
+ /**
+ * Get Component Identifiers with stored state based on Redis Keys
matching key prefix pattern
+ *
+ * @return Component Identifiers with stored state or empty when none found
+ * @throws IOException Thrown on Redis communication failures
+ */
+ @Override
+ public Collection<String> getStoredComponentIds() throws IOException {
+ final byte[] keyPattern = String.format(KEY_PATTERN_FORMAT,
keyPrefix).getBytes(StandardCharsets.UTF_8);
+ final Pattern keyPrefixComponentIdPattern =
Pattern.compile(String.format(KEY_PREFIX_COMPONENT_ID_PATTERN, keyPrefix));
+
+ return withConnection(redisConnection -> {
+ final Set<byte[]> keys = redisConnection.keys(keyPattern);
+ final Set<byte[]> keysFound = keys == null ?
Collections.emptySet() : keys;
+ return keysFound.stream()
+ .map(key -> new String(key, StandardCharsets.UTF_8))
+ .map(keyPrefixComponentIdPattern::matcher)
+ .filter(Matcher::matches)
+ .map(matcher -> matcher.group(COMPONENT_ID_GROUP))
+ .collect(Collectors.toUnmodifiableList());
+ });
+ }
+
private String getComponentKey(final String componentId) {
return keyPrefix + componentId;
}
diff --git
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
index c0b2e0e884..599e5ec6ab 100644
---
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
+++
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
@@ -35,8 +35,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -69,7 +71,7 @@ public class ITRedisStateProvider {
}
@AfterEach
- public void teardown() throws IOException {
+ public void teardown() {
if (provider != null) {
try {
provider.clear(componentId);
@@ -94,6 +96,19 @@ public class ITRedisStateProvider {
assertEquals("value",
getProvider().getState(componentId).get("testSetAndGet"));
}
+ @Test
+ public void testSetAndGetStoredComponentIds() throws IOException {
+ final Collection<String> initialStoredComponentIds =
provider.getStoredComponentIds();
+ assertTrue(initialStoredComponentIds.isEmpty());
+
+ provider.setState(Collections.emptyMap(), componentId);
+ final Collection<String> storedComponentIds =
provider.getStoredComponentIds();
+ final Iterator<String> componentIds = storedComponentIds.iterator();
+
+ assertTrue(componentIds.hasNext());
+ assertEquals(componentId, componentIds.next());
+ }
+
@Test
public void testReplaceSuccessful() throws IOException {
final String key = "testReplaceSuccessful";
@@ -248,11 +263,11 @@ public class ITRedisStateProvider {
final StateMap stateMapAfterRemoval = provider.getState(componentId);
// version should be not present because the state has been removed
entirely.
- assertFalse(stateMap.getStateVersion().isPresent());
+ assertFalse(stateMapAfterRemoval.getStateVersion().isPresent());
}
- private void initializeProvider(final RedisStateProvider provider, final
Map<PropertyDescriptor, String> properties) throws IOException {
+ private void initializeProvider(final RedisStateProvider provider, final
Map<PropertyDescriptor, String> properties) {
provider.initialize(new StateProviderInitializationContext() {
@Override
public String getIdentifier() {
@@ -300,7 +315,7 @@ public class ITRedisStateProvider {
});
}
- private RedisStateProvider createProvider(final Map<PropertyDescriptor,
String> properties) throws Exception {
+ private RedisStateProvider createProvider(final Map<PropertyDescriptor,
String> properties) {
final RedisStateProvider provider = new RedisStateProvider();
initializeProvider(provider, properties);
provider.enable();
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
index 50533593f0..2c5506e67c 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
@@ -153,6 +153,16 @@ public class HashMapStateProvider implements StateProvider
{
return "stateless-state-provider";
}
+ @Override
+ public boolean isComponentEnumerationSupported() {
+ return true;
+ }
+
+ @Override
+ public Collection<String> getStoredComponentIds() {
+ return
Collections.unmodifiableCollection(getAllComponentsState().keySet());
+ }
+
private String getIncrementedVersion(final String currentVersion) {
final long versionNumber = Long.parseLong(currentVersion);
final long version = versionNumber + VERSION_INCREMENT;