This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a671bc26538 NIFI-15791 Added Secret Caching to
ParameterProviderSecretsManager (#11100)
a671bc26538 is described below
commit a671bc2653863893bda0ae84bcd3fd4ca8c37511
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Apr 6 21:42:53 2026 +0200
NIFI-15791 Added Secret Caching to ParameterProviderSecretsManager (#11100)
Signed-off-by: David Handermann <[email protected]>
---
.../java/org/apache/nifi/util/NiFiProperties.java | 1 +
.../secrets/ConnectorTestRunnerSecretsManager.java | 4 +
.../src/main/asciidoc/administration-guide.adoc | 12 ++
.../secrets/ParameterProviderSecretsManager.java | 123 ++++++++++++-
...tandardSecretsManagerInitializationContext.java | 14 ++
.../TestParameterProviderSecretsManager.java | 205 +++++++++++++++++++++
.../connector/secrets/SecretsManager.java | 6 +
.../SecretsManagerInitializationContext.java | 8 +
.../org/apache/nifi/controller/FlowController.java | 7 +-
9 files changed, 373 insertions(+), 7 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index d2291de4a9e..94f79197d98 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -150,6 +150,7 @@ public class NiFiProperties extends ApplicationProperties {
// Secrets Manager properties
public static final String SECRETS_MANAGER_IMPLEMENTATION =
"nifi.secrets.manager.implementation";
+ public static final String SECRETS_MANAGER_CACHE_DURATION =
"nifi.secrets.manager.cache.duration";
// security properties
public static final String SECURITY_KEYSTORE = "nifi.security.keystore";
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java
index e06f228454a..873d987d443 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java
@@ -96,4 +96,8 @@ public class ConnectorTestRunnerSecretsManager implements
SecretsManager {
return secrets;
}
+
+ @Override
+ public void invalidateCache() {
+ }
}
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 4c4e89b0e3b..a5eff343abd 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -3808,6 +3808,18 @@ The NiFi Registry NAR provider retrieves NARs from a
NiFi Registry instance. In
|`nifi.nar.library.provider.nifi-registry.url`| The URL of the NiFi Registry
instance, such as `http://localhost:18080`. If the URL begins with `https`,
then the NiFi keystore and truststore will be used to make the TLS connection.
|====
+[[secrets_manager_properties]]
+=== Secrets Manager Properties
+
+These properties configure the Secrets Manager, which is responsible for
resolving secrets used by Connectors.
+The Secrets Manager delegates to Parameter Providers to retrieve secret values.
+
+|====
+|*Property*|*Description*
+|`nifi.secrets.manager.implementation`|The fully qualified class name of the
Secrets Manager implementation. Defaults to
`org.apache.nifi.components.connector.secrets.ParameterProviderSecretsManager`.
+|`nifi.secrets.manager.cache.duration`|The duration for which resolved secret
values are cached before being refreshed from the underlying Parameter
Providers. Accepts any NiFi time duration value such as `5 mins`, `30 secs`,
etc. A value of `0 sec` disables caching entirely. Defaults to `5 mins`.
+|====
+
[[upgrading_nifi]]
== Upgrading NiFi
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java
index 524351a4137..0f2d87b80bc 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java
@@ -22,9 +22,12 @@ import org.apache.nifi.components.connector.SecretReference;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.controller.ParameterProviderNode;
import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
@@ -33,16 +36,29 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ParameterProviderSecretsManager implements SecretsManager {
private static final Logger logger =
LoggerFactory.getLogger(ParameterProviderSecretsManager.class);
+ private static final String DEFAULT_CACHE_DURATION = "5 mins";
+
private FlowManager flowManager;
+ private Duration cacheDuration;
+ private final Map<String, CachedSecret> secretCache = new
ConcurrentHashMap<>();
+
+ private record CachedSecret(Secret secret, long timestampNanos) {
+ }
@Override
public void initialize(final SecretsManagerInitializationContext
initializationContext) {
this.flowManager = initializationContext.getFlowManager();
+
+ final String cacheDurationValue =
initializationContext.getApplicationProperty(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION);
+ final String effectiveDuration = cacheDurationValue == null ?
DEFAULT_CACHE_DURATION : cacheDurationValue;
+ this.cacheDuration =
Duration.ofNanos(FormatUtils.getTimeDuration(effectiveDuration.trim(),
TimeUnit.NANOSECONDS));
}
@Override
@@ -81,17 +97,33 @@ public class ParameterProviderSecretsManager implements
SecretsManager {
@Override
public Optional<Secret> getSecret(final SecretReference secretReference) {
- final SecretProvider provider = findProvider(secretReference);
+ final String fqn = secretReference.getFullyQualifiedName();
+ if (fqn == null) {
+ return Optional.empty();
+ }
+
+ final Set<SecretProvider> providers = getSecretProviders();
+ final SecretProvider provider = findProvider(secretReference,
providers);
if (provider == null) {
return Optional.empty();
}
- final List<Secret> secrets =
provider.getSecrets(List.of(secretReference.getFullyQualifiedName()));
+ if (!cacheDuration.isZero()) {
+ final CachedSecret cached = secretCache.get(fqn);
+ if (cached != null && !isExpired(cached)) {
+ logger.debug("Cache hit for secret [{}]", fqn);
+ return Optional.ofNullable(cached.secret());
+ }
+ }
+
+ final List<Secret> secrets = provider.getSecrets(List.of(fqn));
if (secrets.isEmpty()) {
return Optional.empty();
}
- return Optional.of(secrets.getFirst());
+ final Secret secret = secrets.getFirst();
+ cacheSecret(fqn, secret);
+ return Optional.of(secret);
}
@Override
@@ -100,10 +132,20 @@ public class ParameterProviderSecretsManager implements
SecretsManager {
return Map.of();
}
+ if (cacheDuration.isZero()) {
+ return fetchSecretsWithoutCache(secretReferences);
+ }
+
+ return fetchSecretsWithCache(secretReferences);
+ }
+
+ private Map<SecretReference, Secret> fetchSecretsWithoutCache(final
Set<SecretReference> secretReferences) {
+ final Set<SecretProvider> providers = getSecretProviders();
+
// Partition secret references by Provider
final Map<SecretProvider, Set<SecretReference>> referencesByProvider =
new HashMap<>();
for (final SecretReference secretReference : secretReferences) {
- final SecretProvider provider = findProvider(secretReference);
+ final SecretProvider provider = findProvider(secretReference,
providers);
referencesByProvider.computeIfAbsent(provider, k -> new
HashSet<>()).add(secretReference);
}
@@ -117,7 +159,6 @@ public class ParameterProviderSecretsManager implements
SecretsManager {
for (final SecretReference secretReference : references) {
secrets.put(secretReference, null);
}
-
continue;
}
@@ -136,9 +177,79 @@ public class ParameterProviderSecretsManager implements
SecretsManager {
return secrets;
}
- private SecretProvider findProvider(final SecretReference secretReference)
{
+ private Map<SecretReference, Secret> fetchSecretsWithCache(final
Set<SecretReference> secretReferences) {
final Set<SecretProvider> providers = getSecretProviders();
+ final Map<SecretReference, Secret> results = new HashMap<>();
+
+ // Partition references into cache hits vs. misses that need fetching
+ final Map<SecretProvider, Set<SecretReference>> uncachedByProvider =
new HashMap<>();
+ for (final SecretReference secretReference : secretReferences) {
+ final String fqn = secretReference.getFullyQualifiedName();
+
+ if (fqn != null) {
+ final CachedSecret cached = secretCache.get(fqn);
+ if (cached != null && !isExpired(cached)) {
+ logger.debug("Cache hit for secret [{}]", fqn);
+ results.put(secretReference, cached.secret());
+ continue;
+ }
+ }
+
+ final SecretProvider provider = findProvider(secretReference,
providers);
+ uncachedByProvider.computeIfAbsent(provider, k -> new
HashSet<>()).add(secretReference);
+ }
+
+ // Batch fetch uncached secrets grouped by provider
+ for (final Map.Entry<SecretProvider, Set<SecretReference>> entry :
uncachedByProvider.entrySet()) {
+ final SecretProvider provider = entry.getKey();
+ final Set<SecretReference> references = entry.getValue();
+
+ if (provider == null) {
+ for (final SecretReference secretReference : references) {
+ results.put(secretReference, null);
+ }
+ continue;
+ }
+
+ final List<String> secretNames = new ArrayList<>();
+ references.forEach(ref ->
secretNames.add(ref.getFullyQualifiedName()));
+ final List<Secret> retrievedSecrets =
provider.getSecrets(secretNames);
+ final Map<String, Secret> secretsByName = retrievedSecrets.stream()
+ .collect(Collectors.toMap(Secret::getFullyQualifiedName,
Function.identity()));
+
+ for (final SecretReference secretReference : references) {
+ final String fqn = secretReference.getFullyQualifiedName();
+ final Secret secret = secretsByName.get(fqn);
+ results.put(secretReference, secret);
+
+ if (secret != null && fqn != null) {
+ cacheSecret(fqn, secret);
+ }
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ public void invalidateCache() {
+ secretCache.clear();
+ logger.debug("Secret cache invalidated");
+ }
+
+ private boolean isExpired(final CachedSecret cached) {
+ final long elapsedNanos = System.nanoTime() - cached.timestampNanos();
+ final Duration elapsed = Duration.ofNanos(elapsedNanos);
+ return elapsed.compareTo(cacheDuration) >= 0;
+ }
+
+ private void cacheSecret(final String fqn, final Secret secret) {
+ if (!cacheDuration.isZero() && fqn != null && secret != null) {
+ secretCache.put(fqn, new CachedSecret(secret, System.nanoTime()));
+ }
+ }
+ private SecretProvider findProvider(final SecretReference secretReference,
final Set<SecretProvider> providers) {
// Search first by Provider ID, if it's provided.
final String providerId = secretReference.getProviderId();
if (providerId != null) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java
index ae5013768a8..da2c1b635b9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java
@@ -19,15 +19,29 @@ package org.apache.nifi.components.connector.secrets;
import org.apache.nifi.controller.flow.FlowManager;
+import java.util.Collections;
+import java.util.Map;
+
public class StandardSecretsManagerInitializationContext implements
SecretsManagerInitializationContext {
private final FlowManager flowManager;
+ private final Map<String, String> properties;
public StandardSecretsManagerInitializationContext(final FlowManager
flowManager) {
+ this(flowManager, Collections.emptyMap());
+ }
+
+ public StandardSecretsManagerInitializationContext(final FlowManager
flowManager, final Map<String, String> properties) {
this.flowManager = flowManager;
+ this.properties = properties == null ? Collections.emptyMap() :
Map.copyOf(properties);
}
@Override
public FlowManager getFlowManager() {
return flowManager;
}
+
+ @Override
+ public String getApplicationProperty(final String key) {
+ return properties.get(key);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java
index 870576010ef..6d5545c9fca 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java
@@ -24,14 +24,21 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterGroup;
+import org.apache.nifi.util.NiFiProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -40,6 +47,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestParameterProviderSecretsManager {
@@ -64,6 +73,8 @@ public class TestParameterProviderSecretsManager {
private static final String SECRET_3_DESCRIPTION = "Third secret";
private static final String SECRET_3_VALUE = "secret-value-three";
+ private static final String DEFAULT_CACHE_DURATION = "5 mins";
+
private ParameterProviderSecretsManager secretsManager;
@BeforeEach
@@ -370,5 +381,199 @@ public class TestParameterProviderSecretsManager {
assertEquals(1, secretProviders.size());
assertEquals("valid-id",
secretProviders.iterator().next().getProviderId());
}
+
+ @Test
+ public void testZeroDurationDisablesCaching() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration("0 sec", providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ manager.getSecret(reference);
+ manager.getSecret(reference);
+
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testSecretCacheReturnsFromCacheWithinDuration() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ final Optional<Secret> first = manager.getSecret(reference);
+ final Optional<Secret> second = manager.getSecret(reference);
+
+ assertTrue(first.isPresent());
+ assertTrue(second.isPresent());
+ assertEquals(SECRET_1_VALUE, first.get().getValue());
+ assertEquals(SECRET_1_VALUE, second.get().getValue());
+
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testSecretCacheRefreshesAfterDurationExpiry() throws
InterruptedException {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration("100 ms", providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ manager.getSecret(reference);
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+
+ Thread.sleep(200);
+
+ manager.getSecret(reference);
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testInvalidateCacheForcesSecretRefresh() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ manager.getSecret(reference);
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+
+ manager.invalidateCache();
+
+ manager.getSecret(reference);
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testGetSecretsBatchWithMixedCacheHitsAndMisses() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE),
+ createParameter(SECRET_2_NAME, SECRET_2_DESCRIPTION,
SECRET_2_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference reference1 =
createSecretReference(PROVIDER_1_ID, null, SECRET_1_NAME);
+ final SecretReference reference2 =
createSecretReference(PROVIDER_1_ID, null, SECRET_2_NAME);
+
+ manager.getSecret(reference1);
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+
+ final Map<SecretReference, Secret> batch =
manager.getSecrets(Set.of(reference1, reference2));
+ assertEquals(2, batch.size());
+ assertNotNull(batch.get(reference1));
+ assertNotNull(batch.get(reference2));
+ assertEquals(SECRET_1_VALUE, batch.get(reference1).getValue());
+ assertEquals(SECRET_2_VALUE, batch.get(reference2).getValue());
+
+ // reference1 was cached, so only one additional fetch for reference2
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testGetSecretsBatchCachesAllFetchedValues() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE),
+ createParameter(SECRET_2_NAME, SECRET_2_DESCRIPTION,
SECRET_2_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference reference1 =
createSecretReference(PROVIDER_1_ID, null, SECRET_1_NAME);
+ final SecretReference reference2 =
createSecretReference(PROVIDER_1_ID, null, SECRET_2_NAME);
+
+ manager.getSecrets(Set.of(reference1, reference2));
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+
+ final Optional<Secret> cached = manager.getSecret(reference2);
+ assertTrue(cached.isPresent());
+ assertEquals(SECRET_2_VALUE, cached.get().getValue());
+
+ // No additional fetch -- served from cache populated by the batch call
+ verify(providerNode, times(1)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testNullFqnReturnsEmpty() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference referenceWithNullFqn = new
SecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_1_NAME, null);
+
+ final Optional<Secret> result =
manager.getSecret(referenceWithNullFqn);
+ assertFalse(result.isPresent());
+
+ verify(providerNode, times(0)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testNegativeResultNotCached() {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference nonExistent =
createSecretReference(PROVIDER_1_ID, null, "does-not-exist");
+
+ final Optional<Secret> first = manager.getSecret(nonExistent);
+ final Optional<Secret> second = manager.getSecret(nonExistent);
+
+ assertFalse(first.isPresent());
+ assertFalse(second.isPresent());
+
+ verify(providerNode, times(2)).fetchParameterValues(anyList());
+ }
+
+ @Test
+ public void testConcurrentAccessDoesNotCorruptCache() throws Exception {
+ final ParameterProviderNode providerNode =
createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
+ createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION,
SECRET_1_VALUE));
+ final ParameterProviderSecretsManager manager =
createManagerWithCacheDuration(DEFAULT_CACHE_DURATION, providerNode);
+
+ final SecretReference reference = createSecretReference(PROVIDER_1_ID,
null, SECRET_1_NAME);
+
+ final int threadCount = 10;
+ final CyclicBarrier barrier = new CyclicBarrier(threadCount);
+ final ExecutorService executor =
Executors.newFixedThreadPool(threadCount);
+ final List<Future<?>> futures = new ArrayList<>();
+
+ for (int i = 0; i < threadCount; i++) {
+ final int index = i;
+ futures.add(executor.submit(() -> {
+ try {
+ barrier.await(5, TimeUnit.SECONDS);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (index % 3 == 0) {
+ manager.invalidateCache();
+ }
+ final Optional<Secret> result = manager.getSecret(reference);
+ assertTrue(result.isPresent());
+ assertEquals(SECRET_1_VALUE, result.get().getValue());
+ }));
+ }
+
+ for (final Future<?> future : futures) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));
+ }
+
+ private ParameterProviderSecretsManager
createManagerWithCacheDuration(final String cacheDuration, final
ParameterProviderNode... providerNodes) {
+ final FlowManager flowManager = mock(FlowManager.class);
+ final Set<ParameterProviderNode> providers = new
HashSet<>(Set.of(providerNodes));
+ when(flowManager.getAllParameterProviders()).thenReturn(providers);
+
+ final ParameterProviderSecretsManager manager = new
ParameterProviderSecretsManager();
+ final SecretsManagerInitializationContext initContext = new
StandardSecretsManagerInitializationContext(
+ flowManager, Map.of(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION,
cacheDuration));
+ manager.initialize(initContext);
+ return manager;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java
index 62365ed940e..e3fbe297976 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java
@@ -37,4 +37,10 @@ public interface SecretsManager {
Map<SecretReference, Secret> getSecrets(Set<SecretReference>
secretReferences);
+ /**
+ * Invalidates any cached secret data, forcing the next access to fetch
fresh data
+ * from the underlying secret providers.
+ */
+ void invalidateCache();
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java
index 4f4cf8e9a86..9bad4cf2a6c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java
@@ -23,4 +23,12 @@ public interface SecretsManagerInitializationContext {
FlowManager getFlowManager();
+ /**
+ * Returns the value of the given application property, or {@code null} if
the property is not set.
+ *
+ * @param key the property key
+ * @return the property value, or {@code null}
+ */
+ String getApplicationProperty(String key);
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7328edec917..8b92b5e411e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -974,7 +974,12 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
extensionManager.discoverExtensions(extensionManager.getAllBundles(),
Set.of(SecretsManager.class), false);
final SecretsManager created =
NarThreadContextClassLoader.createInstance(extensionManager,
implementationClassName, SecretsManager.class, properties);
- final SecretsManagerInitializationContext initializationContext =
new StandardSecretsManagerInitializationContext(flowManager);
+ final Map<String, String> secretsManagerProperties = new
HashMap<>();
+ final String cacheDuration =
properties.getProperty(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION);
+ if (cacheDuration != null) {
+
secretsManagerProperties.put(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION,
cacheDuration);
+ }
+ final SecretsManagerInitializationContext initializationContext =
new StandardSecretsManagerInitializationContext(flowManager,
secretsManagerProperties);
synchronized (created) {
// Ensure that any NAR dependencies are available when we
initialize the ConnectorRepository