This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e302f2aff7 NIFI-10976 Added Previous Cluster State Provider 
configuration (#7235)
e302f2aff7 is described below

commit e302f2aff7af6ec1165e8fa739eaa63dc7e20699
Author: exceptionfactory <[email protected]>
AuthorDate: Wed May 10 15:22:05 2023 -0500

    NIFI-10976 Added Previous Cluster State Provider configuration (#7235)
    
    - Added methods to enumerate Stored Component Identifiers on State Provider 
interface and implementations
    - Added nifi.state.management.provider.cluster.previous to nifi.properties
    - Updated State Manager Provider to restore Cluster State from Previous 
Cluster Provider
    - Updated Configuring State Providers documentation for new property
---
 .../java/org/apache/nifi/util/NiFiProperties.java  |   1 +
 .../src/main/asciidoc/administration-guide.adoc    |  29 ++-
 .../nifi/components/state/StateProvider.java       |  21 ++
 .../manager/StandardStateManagerProvider.java      | 256 +++++++++++++++------
 .../local/WriteAheadLocalStateProvider.java        |  10 +
 .../zookeeper/ZooKeeperStateProvider.java          |  31 ++-
 .../state/providers/AbstractTestStateProvider.java |  10 +
 .../provider/KubernetesConfigMapStateProvider.java |  36 +++
 .../KubernetesConfigMapStateProviderTest.java      |  20 ++
 .../nifi-framework/nifi-resources/pom.xml          |   1 +
 .../src/main/resources/conf/nifi.properties        |   2 +
 .../nifi/redis/state/RedisStateProvider.java       |  38 +++
 .../nifi/redis/state/ITRedisStateProvider.java     |  23 +-
 .../components/state/HashMapStateProvider.java     |  10 +
 14 files changed, 404 insertions(+), 84 deletions(-)

diff --git 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 681ed0a5ee..65615f9520 100644
--- 
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -306,6 +306,7 @@ public class NiFiProperties extends ApplicationProperties {
     public static final String STATE_MANAGEMENT_CONFIG_FILE = 
"nifi.state.management.configuration.file";
     public static final String STATE_MANAGEMENT_LOCAL_PROVIDER_ID = 
"nifi.state.management.provider.local";
     public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_ID = 
"nifi.state.management.provider.cluster";
+    public static final String STATE_MANAGEMENT_CLUSTER_PROVIDER_PREVIOUS_ID = 
"nifi.state.management.provider.cluster.previous";
     public static final String STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER = 
"nifi.state.management.embedded.zookeeper.start";
     public static final String STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES = 
"nifi.state.management.embedded.zookeeper.properties";
 
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 838fe75d7d..15d01f49fd 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -2619,15 +2619,32 @@ in the cluster. This allows one node to pick up where 
another node left off, or
 
 [[state_providers]]
 === Configuring State Providers
-When a component decides to store or retrieve state, it does so by providing a 
"Scope" - either Node-local or Cluster-wide. The
-mechanism that is used to store and retrieve this state is then determined 
based on this Scope, as well as the configured State
-Providers. The _nifi.properties_ file contains three different properties that 
are relevant to configuring these State Providers.
+When a component decides to store or retrieve state, it does so by providing a 
`Scope`, either `Local` to the node or
+applicable to the entire `Cluster`. Component implementation code and 
configuration properties determine the requested
+Scope, which the framework provides according to the State Management 
configuration. The `nifi.properties` configuration
+contains several properties for managing these State Providers.
 
 |====
 |*Property*|*Description*
-|`nifi.state.management.configuration.file`|The first is the property that 
specifies an external XML file that is used for configuring the local and/or 
cluster-wide State Providers. This XML file may contain configurations for 
multiple providers
-|`nifi.state.management.provider.local`|The property that provides the 
identifier of the local State Provider configured in this XML file
-|`nifi.state.management.provider.cluster`|Similarly, the property provides the 
identifier of the cluster-wide State Provider configured in this XML file.
+|`nifi.state.management.configuration.file`|The configuration file specifies 
the path to an external XML file that the
+framework uses to configure State Providers. This XML file may contain 
configurations for multiple providers.
+|`nifi.state.management.provider.local`|The Local Provider stores current 
Local State information. The property value
+identifies a Local Provider in the State Management configuration that the 
framework will use for storing and retrieving
+Local State for requesting components.
+|`nifi.state.management.provider.cluster`|The Cluster Provider stores current 
Cluster State information. The property
+value identifies a Cluster Provider in the State Management configuration that 
the framework will use for storing and
+retrieving Cluster State for requesting components.
+|`nifi.state.management.provider.cluster.previous`|The Previous Cluster State 
Provider enables population of the current
+Cluster State from an existing Provider. The property value identifies a 
Cluster Provider in the State Management
+configuration that the framework will use as the initial source of Cluster 
State when the current Cluster State Provider
+is has no information stored.
+
+The framework enumerates the Current Cluster Provider when a node becomes 
Primary, and proceeds to check the Previous
+Cluster Provider when the Current Cluster Provider does not contain any 
component information. The Previous Cluster
+Provider property value can be set to blank after cluster startup following a 
successful Cluster State restore from
+backup.
+
+The default value is blank.
 |====
 
 This XML file consists of a top-level `state-management` element, which has 
one or more `local-provider` and zero or more `cluster-provider`
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
index e1e4352721..74f7d972ec 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.components.state;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.nifi.components.ConfigurableComponent;
@@ -130,4 +132,23 @@ public interface StateProvider extends 
ConfigurableComponent {
      * @return the {@link Scope}s supported by the configuration
      */
     Scope[] getSupportedScopes();
+
+    /**
+     * Indicates whether the State Provider supports enumerating component 
identifiers with stored state information
+     *
+     * @return Component enumeration supported status
+     */
+    default boolean isComponentEnumerationSupported() {
+        return false;
+    }
+
+    /**
+     * Get Component Identifiers with associated state stored in the Provider
+     *
+     * @return Collection of Component Identifiers with stored state defaults 
to empty
+     * @throws IOException Thrown on failures to retrieve component identifiers
+     */
+    default Collection<String> getStoredComponentIds() throws IOException {
+        return Collections.emptyList();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index 35c3180551..49eb1e98b5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -64,12 +64,15 @@ import org.slf4j.LoggerFactory;
 import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -82,10 +85,18 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
     private final ConcurrentMap<String, StateManager> stateManagers = new 
ConcurrentHashMap<>();
     private final StateProvider localStateProvider;
     private final StateProvider clusterStateProvider;
+    private final StateProvider previousClusterStateProvider;
 
     public StandardStateManagerProvider(final StateProvider 
localStateProvider, final StateProvider clusterStateProvider) {
         this.localStateProvider = localStateProvider;
         this.clusterStateProvider = clusterStateProvider;
+        this.previousClusterStateProvider = null;
+    }
+
+    private StandardStateManagerProvider(final StateProvider 
localStateProvider, final StateProvider clusterStateProvider, final 
StateProvider previousClusterStateProvider) {
+        this.localStateProvider = localStateProvider;
+        this.clusterStateProvider = clusterStateProvider;
+        this.previousClusterStateProvider = previousClusterStateProvider;
     }
 
     protected StateProvider getLocalStateProvider() {
@@ -104,16 +115,21 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
             return provider;
         }
 
-        final StateProvider localProvider = 
createLocalStateProvider(properties,variableRegistry, extensionManager, 
parameterLookup);
+        final SSLContext sslContext = createSslContext(properties);
+
+        final StateProvider localProvider = 
createLocalStateProvider(properties, sslContext, variableRegistry, 
extensionManager, parameterLookup);
 
         final StateProvider clusterProvider;
+        final StateProvider previousClusterProvider;
         if (properties.isNode()) {
-            clusterProvider = 
createClusteredStateProvider(properties,variableRegistry, extensionManager, 
parameterLookup);
+            clusterProvider = createClusteredStateProvider(properties, 
sslContext, variableRegistry, extensionManager, parameterLookup);
+            previousClusterProvider = 
getPreviousClusteredStateProvider(properties, sslContext, variableRegistry, 
extensionManager, parameterLookup);
         } else {
             clusterProvider = null;
+            previousClusterProvider = null;
         }
 
-        provider = new StandardStateManagerProvider(localProvider, 
clusterProvider);
+        provider = new StandardStateManagerProvider(localProvider, 
clusterProvider, previousClusterProvider);
         return provider;
     }
 
@@ -121,105 +137,177 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
         provider = null;
     }
 
-    private static StateProvider createLocalStateProvider(final NiFiProperties 
properties, final VariableRegistry variableRegistry, final ExtensionManager 
extensionManager,
-                                                          final 
ParameterLookup parameterLookup) throws IOException, ConfigParseException {
+    private static SSLContext createSslContext(final NiFiProperties 
properties) {
+        final TlsConfiguration standardTlsConfiguration = 
StandardTlsConfiguration.fromNiFiProperties(properties);
+        try {
+            return 
SslContextFactory.createSslContext(standardTlsConfiguration);
+        } catch (final TlsException e) {
+            throw new IllegalStateException("TLS Security Properties not valid 
for State Manager configuration", e);
+        }
+    }
+
+    private static StateProvider createLocalStateProvider(
+            final NiFiProperties properties,
+            final SSLContext sslContext,
+            final VariableRegistry variableRegistry,
+            final ExtensionManager extensionManager,
+            final ParameterLookup parameterLookup
+    ) throws IOException, ConfigParseException {
         final File configFile = properties.getStateManagementConfigFile();
-        return createStateProvider(configFile, Scope.LOCAL, properties, 
variableRegistry, extensionManager, parameterLookup);
+        final StateProviderConfiguration config = 
getProviderConfiguration(Scope.LOCAL, properties.getLocalStateProviderId(), 
NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID, configFile);
+        return createStateProvider(config, sslContext, variableRegistry, 
extensionManager, parameterLookup);
     }
 
-    private static StateProvider createClusteredStateProvider(final 
NiFiProperties properties, final VariableRegistry variableRegistry, final 
ExtensionManager extensionManager,
-                                                              final 
ParameterLookup parameterLookup) throws IOException, ConfigParseException {
+    private static StateProvider createClusteredStateProvider(
+            final NiFiProperties properties,
+            final SSLContext sslContext,
+            final VariableRegistry variableRegistry,
+            final ExtensionManager extensionManager,
+            final ParameterLookup parameterLookup
+    ) throws IOException, ConfigParseException {
         final File configFile = properties.getStateManagementConfigFile();
-        return createStateProvider(configFile, Scope.CLUSTER, properties, 
variableRegistry, extensionManager, parameterLookup);
+        final StateProviderConfiguration config = 
getProviderConfiguration(Scope.CLUSTER, properties.getClusterStateProviderId(), 
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_ID, configFile);
+        return createStateProvider(config, sslContext, variableRegistry, 
extensionManager, parameterLookup);
     }
 
-    private static StateProvider createStateProvider(final File configFile, 
final Scope scope, final NiFiProperties properties, final VariableRegistry 
variableRegistry,
-                                                     final ExtensionManager 
extensionManager, final ParameterLookup parameterLookup) throws 
ConfigParseException, IOException {
-        final String providerId;
-        final String providerIdPropertyName;
-        final String providerDescription;
-        final String providerXmlElementName;
-        final String oppositeScopeXmlElementName;
-
-        switch (scope) {
-            case CLUSTER:
-                providerId = properties.getClusterStateProviderId();
-                providerIdPropertyName = 
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_ID;
-                providerDescription = "Cluster State Provider";
-                providerXmlElementName = "cluster-provider";
-                oppositeScopeXmlElementName = "local-provider";
-                break;
-            case LOCAL:
-                providerId = properties.getLocalStateProviderId();
-                providerIdPropertyName = 
NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID;
-                providerDescription = "Local State Provider";
-                providerXmlElementName = "local-provider";
-                oppositeScopeXmlElementName = "cluster-provider";
-                break;
-            default:
-                throw new AssertionError("Attempted to create State Provider 
for unknown Scope: " + scope);
+    private static StateProvider getPreviousClusteredStateProvider(
+            final NiFiProperties properties,
+            final SSLContext sslContext,
+            final VariableRegistry variableRegistry,
+            final ExtensionManager extensionManager,
+            final ParameterLookup parameterLookup
+    ) throws IOException {
+        final String clusterProviderPreviousId = 
properties.getProperty(NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_PREVIOUS_ID);
+        if (clusterProviderPreviousId == null || 
clusterProviderPreviousId.isEmpty()) {
+            return null;
+        } else {
+            final File configFile = properties.getStateManagementConfigFile();
+            final StateProviderConfiguration config = 
getProviderConfiguration(Scope.CLUSTER, clusterProviderPreviousId, 
NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_PREVIOUS_ID, configFile);
+            final StateProvider previousClusterStateProvider = 
createStateProvider(config, sslContext, variableRegistry, extensionManager, 
parameterLookup);
+            if 
(previousClusterStateProvider.isComponentEnumerationSupported()) {
+                return previousClusterStateProvider;
+            } else {
+                throw new IllegalStateException(String.format("Previous 
Cluster State Provider [%s] does not support Component Enumeration", 
clusterProviderPreviousId));
+            }
         }
+    }
+
+    private static void loadPreviousClusterState(final StateProvider 
previousProvider, final StateProvider currentProvider) {
+        final String previousProviderId = previousProvider.getIdentifier();
+        final String currentProviderId = currentProvider.getIdentifier();
+
+        try {
+            final Collection<String> currentStoredComponentIds = 
currentProvider.getStoredComponentIds();
+            if (currentStoredComponentIds.isEmpty()) {
+                final Collection<String> previousStoredComponentIds = 
previousProvider.getStoredComponentIds();
+                if (previousStoredComponentIds.isEmpty()) {
+                    logger.info("Cluster State not found in Previous Provider 
[{}]", previousProviderId);
+                } else {
+                    loadPreviousClusterStateComponents(previousProvider, 
currentProvider, previousStoredComponentIds);
+                }
+            } else {
+                logger.info("Previous Cluster State ignored: State found in 
Provider [{}] for Components [{}]", currentProviderId, 
currentStoredComponentIds.size());
+            }
+        } catch (final IOException e) {
+            final String message = String.format("Cluster State Component 
Enumeration failed from Provider [%s] to Provider [%s]", previousProviderId, 
currentProviderId);
+            throw new UncheckedIOException(message, e);
+        }
+    }
+
+    private static void loadPreviousClusterStateComponents(final StateProvider 
previousProvider, final StateProvider currentProvider, final Collection<String> 
previousStoredComponentIds) {
+        final String previousProviderId = previousProvider.getIdentifier();
+        final String currentProviderId = currentProvider.getIdentifier();
+        final Set<String> loadedComponentIds = new LinkedHashSet<>();
+
+        logger.info("Cluster State found in Previous Provider [{}] for 
Components [{}]", previousProviderId, previousStoredComponentIds.size());
+        try {
+            for (final String componentId : previousStoredComponentIds) {
+                final StateMap previousState = 
previousProvider.getState(componentId);
+                final Map<String, String> state = previousState.toMap();
+                currentProvider.setState(state, componentId);
+                logger.info("Cluster State loaded for Component [{}] to 
Provider [{}]", componentId, currentProviderId);
+                loadedComponentIds.add(componentId);
+            }
 
+            logger.info("Cluster State loaded from Provider [{}] to Provider 
[{}] for Components [{}]", previousProviderId, currentProviderId, 
previousStoredComponentIds.size());
+        } catch (final IOException e) {
+            final Set<String> failedComponentIds = new 
LinkedHashSet<>(previousStoredComponentIds);
+            failedComponentIds.removeAll(loadedComponentIds);
+
+            logger.warn("Cluster State loaded for Components {} but failed for 
Components [{}] from Provider [{}] to Provider [{}]",
+                    loadedComponentIds, failedComponentIds, 
previousProviderId, currentProviderId);
+
+            final String message = String.format("Cluster State load failed 
from Provider [%s] to Provider [%s]", previousProviderId, currentProviderId);
+            throw new UncheckedIOException(message, e);
+        }
+    }
+
+    private static StateProviderConfiguration getProviderConfiguration(
+            final Scope scope,
+            final String providerId,
+            final String providerIdPropertyName,
+            final File configFile
+    ) throws IOException {
         if (!configFile.exists()) {
-            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the State Management Configuration File " + 
configFile + " does not exist");
+            throw new IllegalStateException("Cannot create " + scope + " 
Provider because the State Management Configuration File " + configFile + " 
does not exist");
         }
         if (!configFile.canRead()) {
-            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the State Management Configuration File " + 
configFile + " cannot be read");
+            throw new IllegalStateException("Cannot create " + scope + " 
Provider because the State Management Configuration File " + configFile + " 
cannot be read");
         }
 
         if (providerId == null) {
             if (scope == Scope.CLUSTER) {
                 throw new IllegalStateException("Cannot create Cluster State 
Provider because the '" + providerIdPropertyName
-                    + "' property is missing from the NiFi Properties file. In 
order to run NiFi in a cluster, the " + providerIdPropertyName
-                    + " property must be configured in nifi.properties");
+                        + "' property is missing from the NiFi Properties 
file. In order to run NiFi in a cluster, the " + providerIdPropertyName
+                        + " property must be configured in nifi.properties");
             }
 
-            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
-                + "' property is missing from the NiFi Properties file");
+            throw new IllegalStateException("Cannot create " + scope + " 
Provider because the '" + providerIdPropertyName
+                    + "' property is missing from the NiFi Properties file");
         }
 
         if (providerId.trim().isEmpty()) {
-            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
-                + "' property in the NiFi Properties file has no value set. 
This is a required property and must reference the identifier of one of the "
-                + providerXmlElementName + " elements in the State Management 
Configuration File (" + configFile + ")");
+            throw new IllegalStateException("Cannot create " + scope + " 
Provider because the '" + providerIdPropertyName
+                    + "' property in the NiFi Properties file has no value 
set. This is a required property and must reference the identifier of one of 
the "
+                    + scope + " elements in the State Management Configuration 
File (" + configFile + ")");
         }
 
         final StateManagerConfiguration config = 
StateManagerConfiguration.parse(configFile);
         final StateProviderConfiguration providerConfig = 
config.getStateProviderConfiguration(providerId);
         if (providerConfig == null) {
-            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
-                + "' property in the NiFi Properties file is set to '" + 
providerId + "', but there is no " + providerXmlElementName
-                + " entry in the State Management Configuration File (" + 
configFile + ") with this id");
+            throw new IllegalStateException("Cannot create " + scope + " 
Provider because the '" + providerIdPropertyName
+                    + "' property in the NiFi Properties file is set to '" + 
providerId + "', but there is no " + scope
+                    + " entry in the State Management Configuration File (" + 
configFile + ") with this id");
         }
 
         if (providerConfig.getScope() != scope) {
-            throw new IllegalStateException("Cannot create " + 
providerDescription + " because the '" + providerIdPropertyName
-                + "' property in the NiFi Properties file is set to '" + 
providerId + "', but this id is assigned to a " + oppositeScopeXmlElementName
-                + " entry in the State Management Configuration File (" + 
configFile + "), rather than a " + providerXmlElementName + " entry");
+            throw new IllegalStateException("Cannot create " + scope + " 
Provider because the '" + providerIdPropertyName
+                    + "' property in the NiFi Properties file is set to '" + 
providerId + "', but this ID is assigned to another "
+                    + " entry in the State Management Configuration File (" + 
configFile + "), rather than a " + scope + " entry");
         }
 
+        return providerConfig;
+    }
+
+    private static StateProvider createStateProvider(
+            final StateProviderConfiguration providerConfig,
+            final SSLContext sslContext,
+            final VariableRegistry variableRegistry,
+            final ExtensionManager extensionManager,
+            final ParameterLookup parameterLookup
+    ) throws IOException {
         final String providerClassName = providerConfig.getClassName();
 
         final StateProvider provider;
         try {
             provider = instantiateStateProvider(extensionManager, 
providerClassName);
         } catch (final Exception e) {
-            throw new RuntimeException("Cannot create " + providerDescription 
+ " of type " + providerClassName, e);
+            throw new RuntimeException("Cannot create " + 
providerConfig.getScope() + " Provider of type " + providerClassName, e);
         }
 
-        if (!ArrayUtils.contains(provider.getSupportedScopes(), scope)) {
-            throw new RuntimeException("Cannot use " + providerDescription + " 
("+providerClassName+") as it only supports scope(s) " + 
ArrayUtils.toString(provider.getSupportedScopes()) + " but " +
-                "instance"
-                + " is configured to use scope " + scope);
-        }
-
-        final SSLContext sslContext;
-        TlsConfiguration standardTlsConfiguration = 
StandardTlsConfiguration.fromNiFiProperties(properties);
-        try {
-            sslContext = 
SslContextFactory.createSslContext(standardTlsConfiguration);
-        } catch (TlsException e) {
-            logger.error("Encountered an error configuring TLS for state 
manager: ", e);
-            throw new IllegalStateException("Error configuring TLS for state 
manager", e);
+        if (!ArrayUtils.contains(provider.getSupportedScopes(), 
providerConfig.getScope())) {
+            throw new RuntimeException("Cannot use " + 
providerConfig.getScope() + " (" + providerClassName + ") as it only supports 
scope(s) "
+                    + ArrayUtils.toString(provider.getSupportedScopes()) + " 
but instance is configured to use scope " + providerConfig.getScope());
         }
 
         //create variable registry
@@ -253,8 +341,8 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
             propertyMap.put(descriptor, new 
StandardPropertyValue(resourceContext, entry.getValue(),null, parameterLookup, 
variableRegistry));
         }
 
-        final ComponentLog logger = new SimpleProcessLogger(providerId, 
provider);
-        final StateProviderInitializationContext initContext = new 
StandardStateProviderInitializationContext(providerId, propertyMap, sslContext, 
logger);
+        final ComponentLog logger = new 
SimpleProcessLogger(providerConfig.getId(), provider);
+        final StateProviderInitializationContext initContext = new 
StandardStateProviderInitializationContext(providerConfig.getId(), propertyMap, 
sslContext, logger);
 
         synchronized (provider) {
             provider.initialize(initContext);
@@ -267,22 +355,22 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
         int invalidCount = 0;
         for (final ValidationResult result : results) {
             if (!result.isValid()) {
-                validationFailures.append(result.toString()).append("\n");
+                validationFailures.append(result).append("\n");
                 invalidCount++;
             }
         }
 
         if (invalidCount > 0) {
-            throw new IllegalStateException("Could not initialize State 
Providers because the " + providerDescription + " is not valid. The following "
-                + invalidCount + " Validation Errors occurred:\n" + 
validationFailures.toString() + "\nPlease check the configuration of the " + 
providerDescription + " with ID ["
-                + providerId.trim() + "] in the file " + 
configFile.getAbsolutePath());
+            throw new IllegalStateException("Could not initialize State 
Providers because the " + providerConfig.getScope() + " Provider is not valid. 
The following "
+                + invalidCount + " Validation Errors occurred:\n" + 
validationFailures + "\nPlease check the configuration of the " + 
providerConfig.getScope() + " Provider with ID ["
+                + providerConfig.getId() + "]");
         }
 
         return provider;
     }
 
     // Inject NiFi Properties to state providers that use the 
StateProviderContext annotation
-    private static void performMethodInjection(final Object instance, final 
Class stateProviderClass) throws IllegalAccessException, 
IllegalArgumentException, InvocationTargetException {
+    private static void performMethodInjection(final Object instance, final 
Class<?> stateProviderClass) throws IllegalAccessException, 
IllegalArgumentException, InvocationTargetException {
         for (final Method method : stateProviderClass.getMethods()) {
             if (method.isAnnotationPresent(StateProviderContext.class)) {
                 // make the method accessible
@@ -308,13 +396,14 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
             }
         }
 
-        final Class parentClass = stateProviderClass.getSuperclass();
+        final Class<?> parentClass = stateProviderClass.getSuperclass();
         if (parentClass != null && 
StateProvider.class.isAssignableFrom(parentClass)) {
             performMethodInjection(instance, parentClass);
         }
     }
 
-    private static StateProvider instantiateStateProvider(final 
ExtensionManager extensionManager, final String type) throws 
ClassNotFoundException, InstantiationException, IllegalAccessException {
+    private static StateProvider instantiateStateProvider(final 
ExtensionManager extensionManager, final String type)
+            throws ClassNotFoundException, InstantiationException, 
IllegalAccessException, NoSuchMethodException, InvocationTargetException {
         final ClassLoader ctxClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             final List<Bundle> bundles = extensionManager.getBundles(type);
@@ -331,7 +420,7 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
 
             
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
             final Class<? extends StateProvider> mgrClass = 
rawClass.asSubclass(StateProvider.class);
-            StateProvider provider = mgrClass.newInstance();
+            StateProvider provider = 
mgrClass.getDeclaredConstructor().newInstance();
             try {
                 performMethodInjection(provider, mgrClass);
             } catch (InvocationTargetException e) {
@@ -465,6 +554,20 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
                     return stateProvider.getIdentifier();
                 }
             }
+
+            @Override
+            public boolean isComponentEnumerationSupported() {
+                try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
+                    return stateProvider.isComponentEnumerationSupported();
+                }
+            }
+
+            @Override
+            public Collection<String> getStoredComponentIds() throws 
IOException {
+                try (final NarCloseable narCloseable = 
NarCloseable.withNarLoader()) {
+                    return stateProvider.getStoredComponentIds();
+                }
+            }
         };
     }
 
@@ -491,11 +594,18 @@ public class StandardStateManagerProvider implements 
StateManagerProvider {
         if (clusterStateProvider != null) {
             clusterStateProvider.shutdown();
         }
+        if (previousClusterStateProvider != null) {
+            previousClusterStateProvider.shutdown();
+        }
     }
 
     @Override
     public void enableClusterProvider() {
         clusterStateProvider.enable();
+        if (previousClusterStateProvider != null) {
+            previousClusterStateProvider.enable();
+            loadPreviousClusterState(previousClusterStateProvider, 
clusterStateProvider);
+        }
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
index 7f79649656..682fc5a2d7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -227,6 +227,16 @@ public class WriteAheadLocalStateProvider extends 
AbstractStateProvider {
         return new Scope[]{Scope.LOCAL};
     }
 
+    @Override
+    public boolean isComponentEnumerationSupported() {
+        return true;
+    }
+
+    @Override
+    public Collection<String> getStoredComponentIds() {
+        return Collections.unmodifiableCollection(componentProviders.keySet());
+    }
+
     private static class ComponentProvider {
         private final AtomicLong versionGenerator;
         private final WriteAheadRepository<StateMapUpdate> wal;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index a2d7b96798..9f02b6b88a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -57,6 +57,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -75,6 +76,10 @@ import java.util.stream.Collectors;
 public class ZooKeeperStateProvider extends AbstractStateProvider {
     private static final int EMPTY_VERSION = -1;
 
+    private static final String COMPONENTS_RELATIVE_PATH = "/components";
+
+    private static final String COMPONENTS_PATH_FORMAT = "%s%s/%s";
+
     private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperStateProvider.class);
     private NiFiProperties nifiProperties;
 
@@ -259,7 +264,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
     }
 
     private String getComponentPath(final String componentId) {
-        return rootNode + "/components/" + componentId;
+        return String.format(COMPONENTS_PATH_FORMAT, rootNode, 
COMPONENTS_RELATIVE_PATH, componentId);
     }
 
     private void verifyEnabled() throws IOException {
@@ -518,6 +523,30 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
         setState(Collections.emptyMap(), componentId);
     }
 
+    @Override
+    public boolean isComponentEnumerationSupported() {
+        return true;
+    }
+
+    @Override
+    public Collection<String> getStoredComponentIds() throws IOException {
+        try {
+            final ZooKeeper zooKeeper = getZooKeeper();
+            final String componentsPath = String.format("%s%s", rootNode, 
COMPONENTS_RELATIVE_PATH);
+            final List<String> children = 
zooKeeper.getChildren(componentsPath, false);
+            return Collections.unmodifiableCollection(children);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("ZooKeeper communication interrupted", e);
+        } catch (final KeeperException e) {
+            final Code code = e.code();
+            if (Code.NONODE == code) {
+                return Collections.emptyList();
+            }
+            throw new IOException(String.format("ZooKeeper communication 
failed: %s", code), e);
+        }
+    }
+
     private void validateDataSize(final ZKClientConfig clientConfig, final 
byte[] data, final String componentId, final int totalStateValues) throws 
StateTooLargeException {
         final int maximumSize = clientConfig.getInt(ZKConfig.JUTE_MAXBUFFER, 
NiFiProperties.DEFAULT_ZOOKEEPER_JUTE_MAXBUFFER);
         if (data != null && data.length > maximumSize) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
index 43946ec8cf..d07929cff4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/AbstractTestStateProvider.java
@@ -23,8 +23,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.nifi.components.state.StateMap;
@@ -48,8 +50,16 @@ public abstract class AbstractTestStateProvider {
 
     @Test
     public void testSetAndGet() throws IOException {
+        final Collection<String> initialStoredComponentIds = 
getProvider().getStoredComponentIds();
+        assertTrue(initialStoredComponentIds.isEmpty());
+
         getProvider().setState(Collections.singletonMap("testSetAndGet", 
"value"), componentId);
         assertEquals("value", 
getProvider().getState(componentId).get("testSetAndGet"));
+
+        final Collection<String> storedComponentIds = 
getProvider().getStoredComponentIds();
+        final Iterator<String> componentIds = storedComponentIds.iterator();
+        assertTrue(componentIds.hasNext());
+        assertEquals(componentId, componentIds.next());
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
index 9b105598f3..b2b516fe6e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/main/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.kubernetes.state.provider;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ConfigMapList;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.StatusDetails;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -37,12 +38,16 @@ import java.net.HttpURLConnection;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * State Provider implementation based on Kubernetes ConfigMaps with Base64 
encoded keys to meet Kubernetes constraints
@@ -54,6 +59,10 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
 
     private static final String CONFIG_MAP_NAME_FORMAT = "nifi-component-%s";
 
+    private static final Pattern CONFIG_MAP_NAME_PATTERN = 
Pattern.compile("^nifi-component-(.+)$");
+
+    private static final int COMPONENT_ID_GROUP = 1;
+
     /** Encode ConfigMap keys using URL Encoder without padding characters for 
compliance with Kubernetes naming */
     private static final Base64.Encoder encoder = 
Base64.getUrlEncoder().withoutPadding();
 
@@ -247,6 +256,33 @@ public class KubernetesConfigMapStateProvider extends 
AbstractConfigurableCompon
         return SUPPORTED_SCOPES;
     }
 
+    /**
+     * Kubernetes ConfigMap Provider supported Component Enumeration
+     *
+     * @return Component Enumeration supported
+     */
+    @Override
+    public boolean isComponentEnumerationSupported() {
+        return true;
+    }
+
+    /**
+     * Get Component Identifiers with stored state based on ConfigMap names 
matching standard pattern
+     *
+     * @return Component Identifiers with stored state or empty when none found
+     */
+    @Override
+    public Collection<String> getStoredComponentIds() {
+        final ConfigMapList configMapList = 
kubernetesClient.configMaps().inNamespace(namespace).list();
+        return configMapList.getItems().stream()
+                .map(ConfigMap::getMetadata)
+                .map(ObjectMeta::getName)
+                .map(CONFIG_MAP_NAME_PATTERN::matcher)
+                .filter(Matcher::matches)
+                .map(matcher -> matcher.group(COMPONENT_ID_GROUP))
+                .collect(Collectors.toUnmodifiableList());
+    }
+
     /**
      * Get Kubernetes Client using standard configuration
      *
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
index 13412a0003..f50834ba67 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java
@@ -35,7 +35,9 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 
@@ -222,6 +224,24 @@ class KubernetesConfigMapStateProviderTest {
         assertEquals(HttpMethod.DELETE.name(), request.getMethod());
     }
 
+    @Test
+    void testSetStateGetStoredComponentIds() throws IOException {
+        setContext();
+        provider.initialize(context);
+
+        final Collection<String> initialStoredComponentIds = 
provider.getStoredComponentIds();
+        assertTrue(initialStoredComponentIds.isEmpty());
+
+        final Map<String, String> state = 
Collections.singletonMap(STATE_PROPERTY, STATE_VALUE);
+        provider.setState(state, COMPONENT_ID);
+
+        final Collection<String> storedComponentIds = 
provider.getStoredComponentIds();
+        final Iterator<String> componentIds = storedComponentIds.iterator();
+
+        assertTrue(componentIds.hasNext());
+        assertEquals(COMPONENT_ID, componentIds.next());
+    }
+
     private void setContext() {
         when(context.getIdentifier()).thenReturn(IDENTIFIER);
         when(context.getLogger()).thenReturn(logger);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index e8700a07f8..66cacca6d1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -53,6 +53,7 @@
         
<nifi.state.management.embedded.zookeeper.properties>./conf/zookeeper.properties</nifi.state.management.embedded.zookeeper.properties>
         
<nifi.state.management.provider.local>local-provider</nifi.state.management.provider.local>
         
<nifi.state.management.provider.cluster>zk-provider</nifi.state.management.provider.cluster>
+        <nifi.state.management.provider.cluster.previous/>
 
         
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
         
<nifi.flowfile.repository.wal.implementation>org.apache.nifi.wali.SequentialAccessWriteAheadLog</nifi.flowfile.repository.wal.implementation>
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 2f6214c519..4126d31efc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -61,6 +61,8 @@ 
nifi.state.management.configuration.file=${nifi.state.management.configuration.f
 nifi.state.management.provider.local=${nifi.state.management.provider.local}
 # The ID of the cluster-wide state provider. This will be ignored if NiFi is 
not clustered but must be populated if running in a cluster.
 
nifi.state.management.provider.cluster=${nifi.state.management.provider.cluster}
+# The Previous Cluster State Provider from which the framework will load 
Cluster State when the current Cluster Provider has no entries
+nifi.state.management.provider.cluster.previous=${nifi.state.management.provider.cluster.previous}
 # Specifies whether or not this instance of NiFi should run an embedded 
ZooKeeper server
 
nifi.state.management.embedded.zookeeper.start=${nifi.state.management.embedded.zookeeper.start}
 # Properties file that provides the ZooKeeper properties to use if 
<nifi.state.management.embedded.zookeeper.start> is set to true
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
index 72ccac1f7c..3a3c06b512 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java
@@ -42,6 +42,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * A StateProvider backed by Redis.
@@ -78,6 +82,12 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
         STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
     }
 
+    private static final String KEY_PATTERN_FORMAT = "%s*";
+
+    private static final String KEY_PREFIX_COMPONENT_ID_PATTERN = "^%s(.+)$";
+
+    private static final int COMPONENT_ID_GROUP = 1;
+
     private String identifier;
     private String keyPrefix;
     private ComponentLog logger;
@@ -289,6 +299,34 @@ public class RedisStateProvider extends 
AbstractConfigurableComponent implements
         return new Scope[] {Scope.CLUSTER};
     }
 
+    @Override
+    public boolean isComponentEnumerationSupported() {
+        return true;
+    }
+
+    /**
+     * Get Component Identifiers with stored state based on Redis Keys 
matching key prefix pattern
+     *
+     * @return Component Identifiers with stored state or empty when none found
+     * @throws IOException Thrown on Redis communication failures
+     */
+    @Override
+    public Collection<String> getStoredComponentIds() throws IOException {
+        final byte[] keyPattern = String.format(KEY_PATTERN_FORMAT, 
keyPrefix).getBytes(StandardCharsets.UTF_8);
+        final Pattern keyPrefixComponentIdPattern = 
Pattern.compile(String.format(KEY_PREFIX_COMPONENT_ID_PATTERN, keyPrefix));
+
+        return withConnection(redisConnection -> {
+            final Set<byte[]> keys = redisConnection.keys(keyPattern);
+            final Set<byte[]> keysFound = keys == null ? 
Collections.emptySet() : keys;
+            return keysFound.stream()
+                    .map(key -> new String(key, StandardCharsets.UTF_8))
+                    .map(keyPrefixComponentIdPattern::matcher)
+                    .filter(Matcher::matches)
+                    .map(matcher -> matcher.group(COMPONENT_ID_GROUP))
+                    .collect(Collectors.toUnmodifiableList());
+        });
+    }
+
     private String getComponentKey(final String componentId) {
         return keyPrefix + componentId;
     }
diff --git 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
index c0b2e0e884..599e5ec6ab 100644
--- 
a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/ITRedisStateProvider.java
@@ -35,8 +35,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.StandardSocketOptions;
 import java.nio.channels.SocketChannel;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -69,7 +71,7 @@ public class ITRedisStateProvider {
     }
 
     @AfterEach
-    public void teardown() throws IOException {
+    public void teardown() {
         if (provider != null) {
             try {
                 provider.clear(componentId);
@@ -94,6 +96,19 @@ public class ITRedisStateProvider {
         assertEquals("value", 
getProvider().getState(componentId).get("testSetAndGet"));
     }
 
+    @Test
+    public void testSetAndGetStoredComponentIds() throws IOException {
+        final Collection<String> initialStoredComponentIds = 
provider.getStoredComponentIds();
+        assertTrue(initialStoredComponentIds.isEmpty());
+
+        provider.setState(Collections.emptyMap(), componentId);
+        final Collection<String> storedComponentIds = 
provider.getStoredComponentIds();
+        final Iterator<String> componentIds = storedComponentIds.iterator();
+
+        assertTrue(componentIds.hasNext());
+        assertEquals(componentId, componentIds.next());
+    }
+
     @Test
     public void testReplaceSuccessful() throws IOException {
         final String key = "testReplaceSuccessful";
@@ -248,11 +263,11 @@ public class ITRedisStateProvider {
         final StateMap stateMapAfterRemoval = provider.getState(componentId);
 
         // version should be not present because the state has been removed 
entirely.
-        assertFalse(stateMap.getStateVersion().isPresent());
+        assertFalse(stateMapAfterRemoval.getStateVersion().isPresent());
     }
 
 
-    private void initializeProvider(final RedisStateProvider provider, final 
Map<PropertyDescriptor, String> properties) throws IOException {
+    private void initializeProvider(final RedisStateProvider provider, final 
Map<PropertyDescriptor, String> properties) {
         provider.initialize(new StateProviderInitializationContext() {
             @Override
             public String getIdentifier() {
@@ -300,7 +315,7 @@ public class ITRedisStateProvider {
         });
     }
 
-    private RedisStateProvider createProvider(final Map<PropertyDescriptor, 
String> properties) throws Exception {
+    private RedisStateProvider createProvider(final Map<PropertyDescriptor, 
String> properties) {
         final RedisStateProvider provider = new RedisStateProvider();
         initializeProvider(provider, properties);
         provider.enable();
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
index 50533593f0..2c5506e67c 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/components/state/HashMapStateProvider.java
@@ -153,6 +153,16 @@ public class HashMapStateProvider implements StateProvider 
{
         return "stateless-state-provider";
     }
 
+    @Override
+    public boolean isComponentEnumerationSupported() {
+        return true;
+    }
+
+    @Override
+    public Collection<String> getStoredComponentIds() {
+        return 
Collections.unmodifiableCollection(getAllComponentsState().keySet());
+    }
+
     private String getIncrementedVersion(final String currentVersion) {
         final long versionNumber = Long.parseLong(currentVersion);
         final long version = versionNumber + VERSION_INCREMENT;

Reply via email to