This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch NIFI-15258 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 36a0ba24def609ff642bf2eb77c1aac11a37761f Author: Mark Payne <[email protected]> AuthorDate: Fri Dec 12 18:01:16 2025 -0500 NIFI-15312: Implementation of Parameter Provider based Secrets Manager; updated mock framework to support secrets; validation cleanup (#10638) --- nifi-assembly/pom.xml | 12 +- .../java/org/apache/nifi/util/NiFiProperties.java | 3 + .../mock/connector/server/ConnectorTestRunner.java | 12 + .../server/MockConnectorInitializationContext.java | 2 +- .../server/StandardConnectorMockServer.java | 46 ++- .../secrets/ConnectorTestRunnerSecretProvider.java | 86 ++++++ .../secrets/ConnectorTestRunnerSecretsManager.java | 99 +++++++ ...ifi.components.connector.secrets.SecretsManager | 16 ++ .../connector/StandardConnectorTestRunner.java | 24 ++ .../src/main/resources/nifi.properties | 3 + .../nifi/connectors/kafkas3/KafkaToS3IT.java | 78 +++-- .../secrets/ParameterProviderSecretProvider.java | 86 ++++++ .../secrets/ParameterProviderSecretsManager.java | 151 ++++++++++ .../connector/secrets/StandardSecret.java | 127 ++++++++ ...andardSecretsManagerInitializationContext.java} | 21 +- .../parameter/StandardParameterProviderNode.java | 22 +- ...ifi.components.connector.secrets.SecretsManager | 16 ++ .../TestParameterProviderSecretsManager.java | 318 +++++++++++++++++++++ .../connector/ConnectorConfiguration.java | 9 + .../components/connector/ConnectorRepository.java | 3 + .../ConnectorRepositoryInitializationContext.java | 3 + .../FrameworkConnectorInitializationContext.java | 1 + ...eworkConnectorInitializationContextBuilder.java | 1 + .../SecretProvider.java} | 16 +- .../SecretsManager.java} | 24 +- .../SecretsManagerInitializationContext.java} | 10 +- .../nifi/controller/ParameterProviderNode.java | 13 + .../StandardConnectorConfigurationContext.java | 2 + .../StandardConnectorInitializationContext.java | 1 + .../connector/StandardConnectorNode.java | 119 +++++++- ...StandardConnectorRepoInitializationContext.java | 9 + .../connector/StandardConnectorRepository.java | 8 + .../apache/nifi/controller/ExtensionBuilder.java | 6 +- .../org/apache/nifi/controller/FlowController.java | 43 ++- .../nifi/controller/flow/StandardFlowManager.java | 4 +- .../TestStandardConnectorConfigurationContext.java | 1 + .../connector/TestStandardConnectorNode.java | 1 + 37 files changed, 1296 insertions(+), 100 deletions(-) diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 16787bda28..1ddf8ea5b5 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -471,12 +471,6 @@ language governing permissions and limitations under the License. --> <version>2.8.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-kafka-to-s3-nar</artifactId> - <version>2.7.0-SNAPSHOT</version> - <type>nar</type> - </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-groovyx-nar</artifactId> @@ -921,6 +915,12 @@ language governing permissions and limitations under the License. --> <version>2.8.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kafka-to-s3-nar</artifactId> + <version>2.7.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) --> <dependency> <groupId>org.aspectj</groupId> diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index e70e34036b..2b354a7bc3 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -139,6 +139,9 @@ public class NiFiProperties extends ApplicationProperties { // Connector Repository properties public static final String CONNECTOR_REPOSITORY_IMPLEMENTATION = "nifi.components.connectors.repository.implementation"; + // Secrets Manager properties + public static final String SECRETS_MANAGER_IMPLEMENTATION = "nifi.secrets.manager.implementation"; + // security properties public static final String SECURITY_KEYSTORE = "nifi.security.keystore"; public static final String SECURITY_KEYSTORE_TYPE = "nifi.security.keystoreType"; diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java index 8dc3730229..5fe172fa78 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-api/src/main/java/org/apache/nifi/mock/connector/server/ConnectorTestRunner.java @@ -18,7 +18,9 @@ package org.apache.nifi.mock.connector.server; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FlowUpdateException; +import org.apache.nifi.components.connector.SecretReference; import org.apache.nifi.components.connector.StepConfiguration; import java.io.Closeable; @@ -27,6 +29,8 @@ import java.util.List; import java.util.Map; public interface ConnectorTestRunner extends Closeable { + String SECRET_PROVIDER_ID = "TestRunnerSecretsManager"; + String SECRET_PROVIDER_NAME = "TestRunnerSecretsManager"; void applyUpdate() throws FlowUpdateException; @@ -34,10 +38,18 @@ public interface ConnectorTestRunner extends Closeable { void configure(String stepName, Map<String, String> propertyValues) throws FlowUpdateException; + void configure(String stepName, Map<String, String> propertyValues, Map<String, ConnectorValueReference> propertyReferences) throws FlowUpdateException; + + SecretReference createSecretReference(String secretName); + ConnectorConfigVerificationResult verifyConfiguration(String stepName, Map<String, String> propertyValueOverrides); + ConnectorConfigVerificationResult verifyConfiguration(String stepName, Map<String, String> propertyValueOverrides, Map<String, ConnectorValueReference> referenceOverrides); + ConnectorConfigVerificationResult verifyConfiguration(String stepName, StepConfiguration configurationOverrides); + void addSecret(String name, String value); + void startConnector(); void stopConnector(); diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java index 939035bc5e..93cb6fd44a 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java @@ -22,7 +22,7 @@ import org.apache.nifi.components.connector.FrameworkConnectorInitializationCont import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.components.connector.FrameworkConnectorInitializationContext; import org.apache.nifi.components.connector.FrameworkFlowContext; -import org.apache.nifi.components.connector.SecretsManager; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java index 590e5a8049..0e5a2d86ab 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/StandardConnectorMockServer.java @@ -30,9 +30,11 @@ import org.apache.nifi.components.connector.ConnectorRepository; import org.apache.nifi.components.connector.ConnectorState; import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FlowUpdateException; +import org.apache.nifi.components.connector.SecretReference; import org.apache.nifi.components.connector.StandaloneConnectorRequestReplicator; import org.apache.nifi.components.connector.StepConfiguration; import org.apache.nifi.components.connector.StringLiteralValue; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.validation.DisabledServiceValidationResult; import org.apache.nifi.components.validation.ValidationState; @@ -48,6 +50,7 @@ import org.apache.nifi.diagnostics.DiagnosticsFactory; import org.apache.nifi.encrypt.PropertyEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.VolatileBulletinRepository; +import org.apache.nifi.mock.connector.server.secrets.ConnectorTestRunnerSecretsManager; import org.apache.nifi.nar.ExtensionMapping; import org.apache.nifi.processor.Processor; import org.apache.nifi.reporting.BulletinRepository; @@ -74,6 +77,7 @@ public class StandardConnectorMockServer implements ConnectorMockServer { private FlowController flowController; private MockExtensionDiscoveringManager extensionManager; private ConnectorNode connectorNode; + private ConnectorRepository connectorRepository; private FlowEngine flowEngine; private MockExtensionMapper mockExtensionMapper; private FlowFileTransferCounts initialFlowFileTransferCounts = new FlowFileTransferCounts(0L, 0L, 0L, 0L); @@ -113,7 +117,7 @@ public class StandardConnectorMockServer implements ConnectorMockServer { throw new RuntimeException("Failed to initialize FlowFile Repository", e); } - final ConnectorRepository connectorRepository = flowController.getConnectorRepository(); + connectorRepository = flowController.getConnectorRepository(); if (!(connectorRepository instanceof MockConnectorRepository)) { throw new IllegalStateException("Connector Repository is not an instance of MockConnectorRepository"); } @@ -210,19 +214,32 @@ public class StandardConnectorMockServer implements ConnectorMockServer { @Override public void configure(final String stepName, final Map<String, String> propertyValues) throws FlowUpdateException { - final StepConfiguration configuration = toStringLiteralConfiguration(propertyValues); - configure(stepName, configuration); + configure(stepName, propertyValues, Collections.emptyMap()); + } + + @Override + public void configure(final String stepName, final Map<String, String> propertyValues, final Map<String, ConnectorValueReference> propertyReferences) throws FlowUpdateException { + final StepConfiguration stepConfiguration = createStepConfiguration(propertyValues, propertyReferences); + configure(stepName, stepConfiguration); } @Override public ConnectorConfigVerificationResult verifyConfiguration(final String stepName, final Map<String, String> propertyValueOverrides) { - final StepConfiguration configuration = toStringLiteralConfiguration(propertyValueOverrides); + return verifyConfiguration(stepName, propertyValueOverrides, Collections.emptyMap()); + } + + @Override + public ConnectorConfigVerificationResult verifyConfiguration(final String stepName, final Map<String, String> propertyValueOverrides, + final Map<String, ConnectorValueReference> referenceOverrides) { + + final StepConfiguration configuration = createStepConfiguration(propertyValueOverrides, referenceOverrides); return verifyConfiguration(stepName, configuration); } - private StepConfiguration toStringLiteralConfiguration(final Map<String, String> propertyValues) { - final Map<String, ConnectorValueReference> references = new HashMap<>(propertyValues.size()); - propertyValues.forEach((key, value) -> references.put(key, new StringLiteralValue(value))); + private StepConfiguration createStepConfiguration(final Map<String, String> propertyValues, final Map<String, ConnectorValueReference> propertyReferences) { + final Map<String, ConnectorValueReference> references = new HashMap<>(); + propertyValues.forEach((key, value) -> references.put(key, value == null ? null : new StringLiteralValue(value))); + references.putAll(propertyReferences); return new StepConfiguration(references); } @@ -232,6 +249,21 @@ public class StandardConnectorMockServer implements ConnectorMockServer { return new MockServerConfigVerificationResult(results); } + @Override + public void addSecret(final String name, final String value) { + final SecretsManager secretsManager = connectorRepository.getSecretsManager(); + if (!(secretsManager instanceof final ConnectorTestRunnerSecretsManager testRunnerSecretsManager)) { + throw new IllegalStateException("Secrets Manager is not an instance of ConnectorTestRunnerSecretsManager"); + } + + testRunnerSecretsManager.addSecret(name, value); + } + + @Override + public SecretReference createSecretReference(final String secretName) { + return new SecretReference(ConnectorTestRunner.SECRET_PROVIDER_ID, ConnectorTestRunner.SECRET_PROVIDER_NAME, secretName); + } + @Override public void startConnector() { initialFlowFileTransferCounts = connectorNode.getFlowFileTransferCounts(); diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretProvider.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretProvider.java new file mode 100644 index 0000000000..16e05ff4f0 --- /dev/null +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mock.connector.server.secrets; + +import org.apache.nifi.components.connector.Secret; +import org.apache.nifi.components.connector.secrets.SecretProvider; +import org.apache.nifi.components.connector.secrets.StandardSecret; +import org.apache.nifi.mock.connector.server.ConnectorTestRunner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ConnectorTestRunnerSecretProvider implements SecretProvider { + public static final String GROUP_NAME = "Default"; + + private final Map<String, String> secrets = new HashMap<>(); + + public void addSecret(final String key, final String value) { + this.secrets.put(key, value); + } + + @Override + public String getProviderId() { + return ConnectorTestRunner.SECRET_PROVIDER_ID; + } + + @Override + public String getProviderName() { + return ConnectorTestRunner.SECRET_PROVIDER_ID; + } + + @Override + public List<Secret> getAllSecrets() { + final List<Secret> secrets = new ArrayList<>(); + for (final Map.Entry<String, String> entry : this.secrets.entrySet()) { + final Secret secret = new StandardSecret.Builder() + .providerName(ConnectorTestRunner.SECRET_PROVIDER_NAME) + .groupName(GROUP_NAME) + .name(entry.getKey()) + .value(entry.getValue()) + .build(); + + secrets.add(secret); + } + + return secrets; + } + + @Override + public List<Secret> getSecrets(final List<String> fullyQualifiedSecretNames) { + final List<Secret> matchingSecrets = new ArrayList<>(); + + for (final String secretName : fullyQualifiedSecretNames) { + final String value = secrets.get(secretName); + + if (value != null) { + final Secret secret = new StandardSecret.Builder() + .providerName(ConnectorTestRunner.SECRET_PROVIDER_NAME) + .groupName(GROUP_NAME) + .name(secretName) + .value(value) + .build(); + + matchingSecrets.add(secret); + } + } + return matchingSecrets; + } +} diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java new file mode 100644 index 0000000000..8af0f6a8b9 --- /dev/null +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/secrets/ConnectorTestRunnerSecretsManager.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.mock.connector.server.secrets; + +import org.apache.nifi.components.connector.Secret; +import org.apache.nifi.components.connector.SecretReference; +import org.apache.nifi.components.connector.secrets.SecretProvider; +import org.apache.nifi.components.connector.secrets.SecretsManager; +import org.apache.nifi.components.connector.secrets.SecretsManagerInitializationContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ConnectorTestRunnerSecretsManager implements SecretsManager { + private final ConnectorTestRunnerSecretProvider secretProvider = new ConnectorTestRunnerSecretProvider(); + + @Override + public void initialize(final SecretsManagerInitializationContext initializationContext) { + } + + public void addSecret(final String name, final String value) { + secretProvider.addSecret(name, value); + } + + @Override + public List<Secret> getAllSecrets() { + return secretProvider.getAllSecrets(); + } + + @Override + public Set<SecretProvider> getSecretProviders() { + return Set.of(secretProvider); + } + + @Override + public Optional<Secret> getSecret(final SecretReference secretReference) { + // Check that appropriate provider given + final SecretProvider provider = getProvider(secretReference); + if (provider == null) { + return Optional.empty(); + } + + final List<Secret> secrets = provider.getSecrets(List.of(secretReference.getSecretName())); + return secrets.isEmpty() ? Optional.empty() : Optional.of(secrets.getFirst()); + } + + private SecretProvider getProvider(final SecretReference secretReference) { + final String providerId = secretReference.getProviderId(); + if (providerId != null) { + for (final SecretProvider provider : getSecretProviders()) { + if (provider.getProviderId().equals(providerId)) { + return provider; + } + } + + return null; + } + + final String providerName = secretReference.getProviderName(); + if (providerName != null) { + for (final SecretProvider provider : getSecretProviders()) { + if (provider.getProviderName().equals(providerName)) { + return provider; + } + } + } + + return null; + } + + @Override + public Map<SecretReference, Secret> getSecrets(final Set<SecretReference> secretReferences) { + final Map<SecretReference, Secret> secrets = new HashMap<>(); + for (final SecretReference reference : secretReferences) { + final Secret secret = getSecret(reference).orElse(null); + secrets.put(reference, secret); + } + + return secrets; + } +} diff --git a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/resources/META-INF/services/org.apache.nifi.components.connector.secrets.SecretsManager b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/resources/META-INF/services/org.apache.nifi.components.connector.secrets.SecretsManager new file mode 100644 index 0000000000..a1c0de54d1 --- /dev/null +++ b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/resources/META-INF/services/org.apache.nifi.components.connector.secrets.SecretsManager @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.mock.connector.server.secrets.ConnectorTestRunnerSecretsManager diff --git a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java index 5cf5d5ef70..71636a5af1 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java +++ b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/java/org/apache/nifi/mock/connector/StandardConnectorTestRunner.java @@ -20,7 +20,9 @@ package org.apache.nifi.mock.connector; import org.apache.nifi.NiFiServer; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FlowUpdateException; +import org.apache.nifi.components.connector.SecretReference; import org.apache.nifi.components.connector.StepConfiguration; import org.apache.nifi.mock.connector.server.ConnectorConfigVerificationResult; import org.apache.nifi.mock.connector.server.ConnectorMockServer; @@ -131,16 +133,38 @@ public class StandardConnectorTestRunner implements ConnectorTestRunner, Closeab mockServer.configure(stepName, propertyValues); } + @Override + public void configure(final String stepName, final Map<String, String> propertyValues, final Map<String, ConnectorValueReference> propertyReferences) throws FlowUpdateException { + mockServer.configure(stepName, propertyValues, propertyReferences); + } + + @Override + public SecretReference createSecretReference(final String secretName) { + return mockServer.createSecretReference(secretName); + } + @Override public ConnectorConfigVerificationResult verifyConfiguration(final String stepName, final Map<String, String> propertyValueOverrides) { return mockServer.verifyConfiguration(stepName, propertyValueOverrides); } + @Override + public ConnectorConfigVerificationResult verifyConfiguration(final String stepName, final Map<String, String> propertyValueOverrides, + final Map<String, ConnectorValueReference> referenceOverrides) { + + return mockServer.verifyConfiguration(stepName, propertyValueOverrides, referenceOverrides); + } + @Override public ConnectorConfigVerificationResult verifyConfiguration(final String stepName, final StepConfiguration configurationOverrides) { return mockServer.verifyConfiguration(stepName, configurationOverrides); } + @Override + public void addSecret(final String name, final String value) { + mockServer.addSecret(name, value); + } + @Override public void startConnector() { mockServer.startConnector(); diff --git a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties index 046a2980b2..071f1cca3c 100644 --- a/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties +++ b/nifi-connector-mock-bundle/nifi-connector-mock/src/main/resources/nifi.properties @@ -94,6 +94,9 @@ nifi.nar.persistence.provider.properties.directory=target/nifi-storage/nar_repos # Asset Management nifi.asset.manager.properties.directory=target/nifi-storage/assets +# Secrets Manager +nifi.secrets.manager.implementation=org.apache.nifi.mock.connector.server.secrets.ConnectorTestRunnerSecretsManager + # Site to Site properties nifi.remote.input.host=localhost nifi.remote.input.secure=true diff --git a/nifi-connectors/nifi-kafka-to-s3-bundle/nifi-kafka-to-s3-integration-tests/src/test/java/org/apache/nifi/connectors/kafkas3/KafkaToS3IT.java b/nifi-connectors/nifi-kafka-to-s3-bundle/nifi-kafka-to-s3-integration-tests/src/test/java/org/apache/nifi/connectors/kafkas3/KafkaToS3IT.java index 6ccdc34fd9..f715ea2e4e 100644 --- a/nifi-connectors/nifi-kafka-to-s3-bundle/nifi-kafka-to-s3-integration-tests/src/test/java/org/apache/nifi/connectors/kafkas3/KafkaToS3IT.java +++ b/nifi-connectors/nifi-kafka-to-s3-bundle/nifi-kafka-to-s3-integration-tests/src/test/java/org/apache/nifi/connectors/kafkas3/KafkaToS3IT.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.ConnectorValueReference; import org.apache.nifi.components.connector.FlowUpdateException; import org.apache.nifi.mock.connector.StandardConnectorTestRunner; import org.apache.nifi.mock.connector.server.ConnectorConfigVerificationResult; @@ -299,18 +300,19 @@ public class KafkaToS3IT { "Kafka Brokers", "localhost:9093", "Security Protocol", "SASL_PLAINTEXT", "SASL Mechanism", "PLAIN", - "Username", SCRAM_USERNAME, - "Password", SCRAM_PASSWORD + "Username", SCRAM_USERNAME + ); + runner.addSecret("kafka-password", SCRAM_PASSWORD); + final Map<String, ConnectorValueReference> kafkaServerSecretRefs = Map.of( + "Password", runner.createSecretReference("kafka-password") ); - - runner.applyUpdate(); // Perform verification to ensure that valid server configuration passes - final ConnectorConfigVerificationResult connectionVerificationResults = runner.verifyConfiguration("Kafka Connection", kafkaServerConfig); + final ConnectorConfigVerificationResult connectionVerificationResults = runner.verifyConfiguration("Kafka Connection", kafkaServerConfig, kafkaServerSecretRefs); connectionVerificationResults.assertNoFailures(); // Apply the configuration that we've now validated - runner.configure("Kafka Connection", kafkaServerConfig); + runner.configure("Kafka Connection", kafkaServerConfig, kafkaServerSecretRefs); // Perform verification to ensure that valid topic configuration passes final Map<String, String> topic1Config = Map.of( @@ -360,8 +362,11 @@ public class KafkaToS3IT { "Kafka Brokers", "localhost:9093", "Security Protocol", "SASL_PLAINTEXT", "SASL Mechanism", "PLAIN", - "Username", SCRAM_USERNAME, - "Password", SCRAM_PASSWORD + "Username", SCRAM_USERNAME + ); + runner.addSecret("kafka-password", SCRAM_PASSWORD); + final Map<String, ConnectorValueReference> kafkaSecretRefs = Map.of( + "Password", runner.createSecretReference("kafka-password") ); final Map<String, String> kafkaTopicConfig = Map.of( @@ -378,14 +383,17 @@ public class KafkaToS3IT { Map.entry("S3 Endpoint Override URL", localStackContainer.getEndpoint().toString()), Map.entry("S3 Authentication Strategy", "Access Key ID and Secret Key"), Map.entry("S3 Access Key ID", localStackContainer.getAccessKey()), - Map.entry("S3 Secret Access Key", localStackContainer.getSecretKey()), Map.entry("Target Object Size", "1 MB"), Map.entry("Merge Latency", "1 sec") ); + runner.addSecret("s3-secret-key", localStackContainer.getSecretKey()); + final Map<String, ConnectorValueReference> s3SecretRefs = Map.of( + "S3 Secret Access Key", runner.createSecretReference("s3-secret-key") + ); - runner.configure("Kafka Connection", kafkaServerConfig); + runner.configure("Kafka Connection", kafkaServerConfig, kafkaSecretRefs); runner.configure("Kafka Topics", kafkaTopicConfig); - runner.configure("S3 Configuration", s3Config); + runner.configure("S3 Configuration", s3Config, s3SecretRefs); runner.applyUpdate(); final List<ValidationResult> validationResults = runner.validate(); @@ -449,14 +457,17 @@ public class KafkaToS3IT { "Security Protocol", "SASL_PLAINTEXT", "SASL Mechanism", "PLAIN", "Username", SCRAM_USERNAME, - "Password", SCRAM_PASSWORD, "Schema Registry URL", getSchemaRegistryUrl() ); + runner.addSecret("kafka-password", SCRAM_PASSWORD); + final Map<String, ConnectorValueReference> kafkaSecretRefs = Map.of( + "Password", runner.createSecretReference("kafka-password") + ); - final ConnectorConfigVerificationResult connectionVerificationResults = runner.verifyConfiguration("Kafka Connection", kafkaConnectionConfig); + final ConnectorConfigVerificationResult connectionVerificationResults = runner.verifyConfiguration("Kafka Connection", kafkaConnectionConfig, kafkaSecretRefs); connectionVerificationResults.assertNoFailures(); - runner.configure("Kafka Connection", kafkaConnectionConfig); + runner.configure("Kafka Connection", kafkaConnectionConfig, kafkaSecretRefs); final Map<String, String> avroTopicConfig = Map.of( "Topic Names", "avro-topic", @@ -517,9 +528,12 @@ public class KafkaToS3IT { "Security Protocol", "SASL_PLAINTEXT", "SASL Mechanism", "PLAIN", "Username", SCRAM_USERNAME, - "Password", SCRAM_PASSWORD, "Schema Registry URL", getSchemaRegistryUrl() ); + runner.addSecret("kafka-password", SCRAM_PASSWORD); + final Map<String, ConnectorValueReference> kafkaSecretRefs = Map.of( + "Password", runner.createSecretReference("kafka-password") + ); final Map<String, String> kafkaTopicConfig = Map.of( "Topic Names", "user-events", @@ -535,14 +549,17 @@ public class KafkaToS3IT { Map.entry("S3 Endpoint Override URL", localStackContainer.getEndpoint().toString()), Map.entry("S3 Authentication Strategy", "Access Key ID and Secret Key"), Map.entry("S3 Access Key ID", localStackContainer.getAccessKey()), - Map.entry("S3 Secret Access Key", localStackContainer.getSecretKey()), Map.entry("Target Object Size", "1 MB"), Map.entry("Merge Latency", "1 sec") ); + runner.addSecret("s3-secret-key", localStackContainer.getSecretKey()); + final Map<String, ConnectorValueReference> s3SecretRefs = Map.of( + "S3 Secret Access Key", runner.createSecretReference("s3-secret-key") + ); - runner.configure("Kafka Connection", kafkaConnectionConfig); + runner.configure("Kafka Connection", kafkaConnectionConfig, kafkaSecretRefs); runner.configure("Kafka Topics", kafkaTopicConfig); - runner.configure("S3 Configuration", s3Config); + runner.configure("S3 Configuration", s3Config, s3SecretRefs); runner.applyUpdate(); final List<ValidationResult> validationResults = runner.validate(); @@ -613,8 +630,11 @@ public class KafkaToS3IT { "Kafka Brokers", "localhost:9093", "Security Protocol", "SASL_PLAINTEXT", "SASL Mechanism", "PLAIN", - "Username", SCRAM_USERNAME, - "Password", SCRAM_PASSWORD + "Username", SCRAM_USERNAME + ); + runner.addSecret("kafka-password", SCRAM_PASSWORD); + final Map<String, ConnectorValueReference> kafkaSecretRefs = Map.of( + "Password", runner.createSecretReference("kafka-password") ); final Map<String, String> jsonTopicConfig = Map.of( @@ -631,14 +651,17 @@ public class KafkaToS3IT { Map.entry("S3 Endpoint Override URL", "http://invalid-s3-endpoint:9999"), Map.entry("S3 Authentication Strategy", "Access Key ID and Secret Key"), Map.entry("S3 Access Key ID", localStackContainer.getAccessKey()), - Map.entry("S3 Secret Access Key", localStackContainer.getSecretKey()), Map.entry("Target Object Size", "1 MB"), Map.entry("Merge Latency", "1 sec") ); + runner.addSecret("s3-secret-key", localStackContainer.getSecretKey()); + final Map<String, ConnectorValueReference> s3SecretRefs = Map.of( + "S3 Secret Access Key", runner.createSecretReference("s3-secret-key") + ); - runner.configure("Kafka Connection", kafkaServerConfig); + runner.configure("Kafka Connection", kafkaServerConfig, kafkaSecretRefs); runner.configure("Kafka Topics", jsonTopicConfig); - runner.configure("S3 Configuration", s3InvalidConfig); + runner.configure("S3 Configuration", s3InvalidConfig, s3SecretRefs); runner.applyUpdate(); // Run the Connector with the invalid S3 endpoint to queue the JSON data. Wait for data to be queued up. @@ -658,12 +681,11 @@ public class KafkaToS3IT { Map.entry("S3 Endpoint Override URL", localStackContainer.getEndpoint().toString()), Map.entry("S3 Authentication Strategy", "Access Key ID and Secret Key"), Map.entry("S3 Access Key ID", localStackContainer.getAccessKey()), - Map.entry("S3 Secret Access Key", localStackContainer.getSecretKey()), Map.entry("Target Object Size", "1 MB"), Map.entry("Merge Latency", "1 sec") ); - runner.configure("S3 Configuration", s3ValidJsonConfig); + runner.configure("S3 Configuration", s3ValidJsonConfig, s3SecretRefs); runner.applyUpdate(); // Make sure there is no data in S3 yet. @@ -679,7 +701,6 @@ public class KafkaToS3IT { Map.entry("S3 Endpoint Override URL", localStackContainer.getEndpoint().toString()), Map.entry("S3 Authentication Strategy", "Access Key ID and Secret Key"), Map.entry("S3 Access Key ID", localStackContainer.getAccessKey()), - Map.entry("S3 Secret Access Key", localStackContainer.getSecretKey()), Map.entry("Target Object Size", "1 MB"), Map.entry("Merge Latency", "1 sec") ); @@ -697,13 +718,12 @@ public class KafkaToS3IT { "Security Protocol", "SASL_PLAINTEXT", "SASL Mechanism", "PLAIN", "Username", SCRAM_USERNAME, - "Password", SCRAM_PASSWORD, "Schema Registry URL", getSchemaRegistryUrl() ); - runner.configure("Kafka Connection", kafkaConnectionWithSchemaRegistry); + runner.configure("Kafka Connection", kafkaConnectionWithSchemaRegistry, kafkaSecretRefs); runner.configure("Kafka Topics", avroTopicConfig); - runner.configure("S3 Configuration", s3ValidAvroConfig); + runner.configure("S3 Configuration", s3ValidAvroConfig, s3SecretRefs); runner.applyUpdate(); // After draining, there should be one JSON file in S3. diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretProvider.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretProvider.java new file mode 100644 index 0000000000..2f058f3c2a --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.secrets; + +import org.apache.nifi.components.connector.Secret; +import org.apache.nifi.controller.ParameterProviderNode; +import org.apache.nifi.parameter.Parameter; +import org.apache.nifi.parameter.ParameterDescriptor; +import org.apache.nifi.parameter.ParameterGroup; + +import java.util.ArrayList; +import java.util.List; + +public class ParameterProviderSecretProvider implements SecretProvider { + private final ParameterProviderNode parameterProvider; + + public ParameterProviderSecretProvider(final ParameterProviderNode parameterProvider) { + this.parameterProvider = parameterProvider; + } + + @Override + public String getProviderId() { + return parameterProvider.getIdentifier(); + } + + @Override + public String getProviderName() { + return parameterProvider.getName(); + } + + @Override + public List<Secret> getAllSecrets() { + final List<Secret> secrets = new ArrayList<>(); + + final List<ParameterGroup> parameterGroups = parameterProvider.fetchParameterValues(); + for (final ParameterGroup group : parameterGroups) { + for (final Parameter parameter : group.getParameters()) { + final Secret secret = createSecret(group.getGroupName(), parameter); + secrets.add(secret); + } + } + + return secrets; + } + + private Secret createSecret(final String groupName, final Parameter parameter) { + final ParameterDescriptor descriptor = parameter.getDescriptor(); + + return new StandardSecret.Builder() + .providerName(getProviderName()) + .groupName(groupName) + .name(descriptor.getName()) + .description(descriptor.getDescription()) + .value(parameter.getValue()) + .build(); + } + + @Override + public List<Secret> getSecrets(final List<String> fullyQualifiedSecretNames) { + final List<ParameterGroup> parameterGroups = parameterProvider.fetchParameterValues(fullyQualifiedSecretNames); + final List<Secret> secrets = new ArrayList<>(); + for (final ParameterGroup group : parameterGroups) { + for (final Parameter parameter : group.getParameters()) { + final Secret secret = createSecret(group.getGroupName(), parameter); + secrets.add(secret); + } + } + + return secrets; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java new file mode 100644 index 0000000000..50919c8b88 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/ParameterProviderSecretsManager.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.secrets; + +import org.apache.nifi.components.connector.Secret; +import org.apache.nifi.components.connector.SecretReference; +import org.apache.nifi.controller.ParameterProviderNode; +import org.apache.nifi.controller.flow.FlowManager; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ParameterProviderSecretsManager implements SecretsManager { + private FlowManager flowManager; + + @Override + public void initialize(final SecretsManagerInitializationContext initializationContext) { + this.flowManager = initializationContext.getFlowManager(); + } + + @Override + public List<Secret> getAllSecrets() { + final List<Secret> secrets = new ArrayList<>(); + for (final SecretProvider provider : getSecretProviders()) { + secrets.addAll(provider.getAllSecrets()); + } + + // Sort secrets by Provider Name, then Group Name, then Secret Name + secrets.sort(Comparator.comparing(Secret::getProviderName) + .thenComparing(Secret::getGroupName) + .thenComparing(Secret::getName)); + + return secrets; + } + + @Override + public Set<SecretProvider> getSecretProviders() { + final Set<SecretProvider> providers = new HashSet<>(); + for (final ParameterProviderNode parameterProviderNode : flowManager.getAllParameterProviders()) { + providers.add(new ParameterProviderSecretProvider(parameterProviderNode)); + } + + return providers; + } + + @Override + public Optional<Secret> getSecret(final SecretReference secretReference) { + final SecretProvider provider = findProvider(secretReference); + if (provider == null) { + return Optional.empty(); + } + + final List<Secret> secrets = provider.getSecrets(List.of(secretReference.getSecretName())); + if (secrets.isEmpty()) { + return Optional.empty(); + } + + return Optional.of(secrets.getFirst()); + } + + @Override + public Map<SecretReference, Secret> getSecrets(final Set<SecretReference> secretReferences) { + // Partition secret references by Provider + final Map<SecretProvider, Set<SecretReference>> referencesByProvider = new HashMap<>(); + for (final SecretReference secretReference : secretReferences) { + final SecretProvider provider = findProvider(secretReference); + referencesByProvider.computeIfAbsent(provider, k -> new HashSet<>()).add(secretReference); + } + + final Map<SecretReference, Secret> secrets = new HashMap<>(); + for (final Map.Entry<SecretProvider, Set<SecretReference>> entry : referencesByProvider.entrySet()) { + final SecretProvider provider = entry.getKey(); + final Set<SecretReference> references = entry.getValue(); + + // If no provider found, be sure to map to a null Secret rather than skipping + if (provider == null) { + for (final SecretReference secretReference : references) { + secrets.put(secretReference, null); + } + + continue; + } + + final List<String> secretNames = new ArrayList<>(); + references.forEach(ref -> secretNames.add(ref.getSecretName())); + final List<Secret> retrievedSecrets = provider.getSecrets(secretNames); + final Map<String, Secret> secretsByName = retrievedSecrets.stream() + .collect(Collectors.toMap(Secret::getName, Function.identity())); + + for (final SecretReference secretReference : references) { + final Secret secret = secretsByName.get(secretReference.getSecretName()); + secrets.put(secretReference, secret); + } + } + + return secrets; + } + + private SecretProvider findProvider(final SecretReference secretReference) { + final Set<SecretProvider> providers = getSecretProviders(); + + // Search first by Provider ID, if it's provided. + final String providerId = secretReference.getProviderId(); + if (providerId != null) { + for (final SecretProvider provider : providers) { + if (providerId.equals(provider.getProviderId())) { + return provider; + } + } + + // If ID is provided but doesn't match, do not consider name. + return null; + } + + // No Provider found by ID, search by Provider Name + final String providerName = secretReference.getProviderName(); + if (providerName != null) { + for (final SecretProvider provider : providers) { + if (providerName.equals(provider.getProviderName())) { + return provider; + } + } + } + + // No Provider found + return null; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecret.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecret.java new file mode 100644 index 0000000000..443b19cd06 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecret.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.secrets; + +import org.apache.nifi.components.connector.Secret; + +import java.util.Objects; + +public class StandardSecret implements Secret { + private final String providerName; + private final String groupName; + private final String name; + private final String description; + private final String value; + + private StandardSecret(final Builder builder) { + this.providerName = builder.providerName; + this.groupName = builder.groupName; + this.name = builder.name; + this.description = builder.description; + this.value = builder.value; + } + + @Override + public String getProviderName() { + return providerName; + } + + @Override + public String getGroupName() { + return groupName; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String toString() { + return "StandardSecret[providerName=%s, groupName=%s, name=%s, description=%s]".formatted( + providerName, groupName, name, description); + } + + @Override + public int hashCode() { + return Objects.hash(providerName, groupName, name, description); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final StandardSecret other = (StandardSecret) obj; + return Objects.equals(this.providerName, other.providerName) + && Objects.equals(this.groupName, other.groupName) + && Objects.equals(this.name, other.name) + && Objects.equals(this.description, other.description); + } + + public static class Builder { + private String providerName; + private String groupName; + private String name; + private String description; + private String value; + + public Builder providerName(String providerName) { + this.providerName = providerName; + return this; + } + + public Builder groupName(String groupName) { + this.groupName = groupName; + return this; + } + + public Builder name(String name) { + this.name = name; + return this; + } + + public Builder description(String description) { + this.description = description; + return this; + } + + public Builder value(String value) { + this.value = value; + return this; + } + + public StandardSecret build() { + return new StandardSecret(this); + } + } + +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java similarity index 66% copy from nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java copy to nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java index 5b0b923ac7..ae5013768a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/components/connector/secrets/StandardSecretsManagerInitializationContext.java @@ -15,20 +15,19 @@ * limitations under the License. */ -package org.apache.nifi.components.connector; +package org.apache.nifi.components.connector.secrets; -import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.controller.flow.FlowManager; -import org.apache.nifi.nar.ExtensionManager; -public interface ConnectorRepositoryInitializationContext { +public class StandardSecretsManagerInitializationContext implements SecretsManagerInitializationContext { + private final FlowManager flowManager; - FlowManager getFlowManager(); - - ExtensionManager getExtensionManager(); - - NodeTypeProvider getNodeTypeProvider(); - - ConnectorRequestReplicator getRequestReplicator(); + public StandardSecretsManagerInitializationContext(final FlowManager flowManager) { + this.flowManager = flowManager; + } + @Override + public FlowManager getFlowManager() { + return flowManager; + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java index ee2a47a6c7..b3b14565b1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/parameter/StandardParameterProviderNode.java @@ -276,16 +276,32 @@ public class StandardParameterProviderNode extends AbstractComponentNode impleme } @Override - public void fetchParameters() { + public List<ParameterGroup> fetchParameterValues() { final ParameterProvider parameterProvider = parameterProviderRef.get().getParameterProvider(); final ConfigurationContext configurationContext = getConfigurationContext(); - List<ParameterGroup> fetchedParameterGroups; + try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(getExtensionManager(), parameterProvider.getClass(), parameterProvider.getIdentifier())) { - fetchedParameterGroups = parameterProvider.fetchParameters(configurationContext); + return parameterProvider.fetchParameters(configurationContext); } catch (final IOException | RuntimeException e) { throw new IllegalStateException(String.format("Error fetching parameters for %s: %s", this, e.getMessage()), e); } + } + + @Override + public List<ParameterGroup> fetchParameterValues(final List<String> fullyQualifiedParameterNames) { + final ParameterProvider parameterProvider = parameterProviderRef.get().getParameterProvider(); + final ConfigurationContext configurationContext = getConfigurationContext(); + try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(getExtensionManager(), parameterProvider.getClass(), parameterProvider.getIdentifier())) { + return parameterProvider.fetchParameters(configurationContext, fullyQualifiedParameterNames); + } catch (final IOException | RuntimeException e) { + throw new IllegalStateException(String.format("Error fetching parameters for %s: %s", this, e.getMessage()), e); + } + } + + @Override + public void fetchParameters() { + final List<ParameterGroup> fetchedParameterGroups = fetchParameterValues(); if (fetchedParameterGroups == null || fetchedParameterGroups.isEmpty()) { return; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/resources/META-INF/services/org.apache.nifi.components.connector.secrets.SecretsManager b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/resources/META-INF/services/org.apache.nifi.components.connector.secrets.SecretsManager new file mode 100644 index 0000000000..082fffbd8f --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/resources/META-INF/services/org.apache.nifi.components.connector.secrets.SecretsManager @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.components.connector.secrets.ParameterProviderSecretsManager diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java new file mode 100644 index 0000000000..51a50973ef --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/components/connector/secrets/TestParameterProviderSecretsManager.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components.connector.secrets; + +import org.apache.nifi.components.connector.Secret; +import org.apache.nifi.components.connector.SecretReference; +import org.apache.nifi.controller.ParameterProviderNode; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.parameter.Parameter; +import org.apache.nifi.parameter.ParameterDescriptor; +import org.apache.nifi.parameter.ParameterGroup; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestParameterProviderSecretsManager { + + private static final String PROVIDER_1_ID = "provider-1-id"; + private static final String PROVIDER_1_NAME = "Provider One"; + private static final String PROVIDER_2_ID = "provider-2-id"; + private static final String PROVIDER_2_NAME = "Provider Two"; + + private static final String GROUP_1_NAME = "Group One"; + private static final String GROUP_2_NAME = "Group Two"; + + private static final String SECRET_1_NAME = "secret-one"; + private static final String SECRET_1_DESCRIPTION = "First secret"; + private static final String SECRET_1_VALUE = "secret-value-one"; + + private static final String SECRET_2_NAME = "secret-two"; + private static final String SECRET_2_DESCRIPTION = "Second secret"; + private static final String SECRET_2_VALUE = "secret-value-two"; + + private static final String SECRET_3_NAME = "secret-three"; + private static final String SECRET_3_DESCRIPTION = "Third secret"; + private static final String SECRET_3_VALUE = "secret-value-three"; + + private ParameterProviderSecretsManager secretsManager; + + @BeforeEach + public void setup() { + final FlowManager flowManager = mock(FlowManager.class); + final ParameterProviderNode providerNode1 = createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME, + createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, SECRET_1_VALUE), + createParameter(SECRET_2_NAME, SECRET_2_DESCRIPTION, SECRET_2_VALUE)); + final ParameterProviderNode providerNode2 = createMockedParameterProviderNode(PROVIDER_2_ID, PROVIDER_2_NAME, GROUP_2_NAME, + createParameter(SECRET_3_NAME, SECRET_3_DESCRIPTION, SECRET_3_VALUE)); + + final Set<ParameterProviderNode> providers = new HashSet<>(); + providers.add(providerNode1); + providers.add(providerNode2); + when(flowManager.getAllParameterProviders()).thenReturn(providers); + + secretsManager = new ParameterProviderSecretsManager(); + final SecretsManagerInitializationContext initContext = new StandardSecretsManagerInitializationContext(flowManager); + secretsManager.initialize(initContext); + } + + private ParameterProviderNode createMockedParameterProviderNode(final String id, final String name, final String groupName, final Parameter... parameters) { + final ParameterProviderNode node = mock(ParameterProviderNode.class); + when(node.getIdentifier()).thenReturn(id); + when(node.getName()).thenReturn(name); + + final List<Parameter> parameterList = List.of(parameters); + final ParameterGroup group = new ParameterGroup(groupName, parameterList); + final List<ParameterGroup> groups = List.of(group); + + when(node.fetchParameterValues()).thenReturn(groups); + + when(node.fetchParameterValues(anyList())).thenAnswer(invocation -> { + final List<String> requestedNames = invocation.getArgument(0); + final List<Parameter> matchingParameters = parameterList.stream() + .filter(p -> requestedNames.contains(p.getDescriptor().getName())) + .toList(); + if (matchingParameters.isEmpty()) { + return List.of(); + } + return List.of(new ParameterGroup(groupName, matchingParameters)); + }); + + return node; + } + + private Parameter createParameter(final String name, final String description, final String value) { + final ParameterDescriptor descriptor = new ParameterDescriptor.Builder() + .name(name) + .description(description) + .build(); + return new Parameter.Builder() + .descriptor(descriptor) + .value(value) + .build(); + } + + private SecretReference createSecretReference(final String providerId, final String providerName, final String secretName) { + final SecretReference reference = mock(SecretReference.class); + when(reference.getProviderId()).thenReturn(providerId); + when(reference.getProviderName()).thenReturn(providerName); + when(reference.getSecretName()).thenReturn(secretName); + return reference; + } + + @Test + public void testGetSecretProvidersReturnsOneProviderPerMockedParameterProvider() { + final Set<SecretProvider> providers = secretsManager.getSecretProviders(); + + assertEquals(2, providers.size()); + + boolean foundProvider1 = false; + boolean foundProvider2 = false; + for (final SecretProvider provider : providers) { + if (PROVIDER_1_ID.equals(provider.getProviderId())) { + assertEquals(PROVIDER_1_NAME, provider.getProviderName()); + foundProvider1 = true; + } else if (PROVIDER_2_ID.equals(provider.getProviderId())) { + assertEquals(PROVIDER_2_NAME, provider.getProviderName()); + foundProvider2 = true; + } + } + + assertTrue(foundProvider1); + assertTrue(foundProvider2); + } + + @Test + public void testGetAllSecretsRetrievesSecretsFromAllProviders() { + final List<Secret> allSecrets = secretsManager.getAllSecrets(); + + assertEquals(3, allSecrets.size()); + + boolean foundSecret1 = false; + boolean foundSecret2 = false; + boolean foundSecret3 = false; + + for (final Secret secret : allSecrets) { + if (SECRET_1_NAME.equals(secret.getName())) { + assertEquals(PROVIDER_1_NAME, secret.getProviderName()); + assertEquals(GROUP_1_NAME, secret.getGroupName()); + assertEquals(SECRET_1_VALUE, secret.getValue()); + foundSecret1 = true; + } else if (SECRET_2_NAME.equals(secret.getName())) { + assertEquals(PROVIDER_1_NAME, secret.getProviderName()); + assertEquals(GROUP_1_NAME, secret.getGroupName()); + assertEquals(SECRET_2_VALUE, secret.getValue()); + foundSecret2 = true; + } else if (SECRET_3_NAME.equals(secret.getName())) { + assertEquals(PROVIDER_2_NAME, secret.getProviderName()); + assertEquals(GROUP_2_NAME, secret.getGroupName()); + assertEquals(SECRET_3_VALUE, secret.getValue()); + foundSecret3 = true; + } + } + + assertTrue(foundSecret1); + assertTrue(foundSecret2); + assertTrue(foundSecret3); + } + + @Test + public void testGetSecretReturnsPopulatedOptionalWhenSecretIsFoundById() { + final SecretReference reference = createSecretReference(PROVIDER_1_ID, null, SECRET_1_NAME); + + final Optional<Secret> result = secretsManager.getSecret(reference); + + assertTrue(result.isPresent()); + final Secret secret = result.get(); + assertEquals(SECRET_1_NAME, secret.getName()); + assertEquals(SECRET_1_VALUE, secret.getValue()); + assertEquals(PROVIDER_1_NAME, secret.getProviderName()); + } + + @Test + public void testGetSecretReturnsPopulatedOptionalWhenSecretIsFoundByName() { + final SecretReference reference = createSecretReference(null, PROVIDER_2_NAME, SECRET_3_NAME); + + final Optional<Secret> result = secretsManager.getSecret(reference); + + assertTrue(result.isPresent()); + final Secret secret = result.get(); + assertEquals(SECRET_3_NAME, secret.getName()); + assertEquals(SECRET_3_VALUE, secret.getValue()); + assertEquals(PROVIDER_2_NAME, secret.getProviderName()); + } + + @Test + public void testGetSecretReturnsEmptyOptionalWhenInvalidSecretNameProvided() { + final SecretReference reference = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, "non-existent-secret"); + final Optional<Secret> result = secretsManager.getSecret(reference); + assertFalse(result.isPresent()); + } + + @Test + public void testGetSecretReturnsEmptyOptionalWhenProviderNotFound() { + final SecretReference reference = createSecretReference("invalid-provider-id", "Invalid Provider", SECRET_1_NAME); + final Optional<Secret> result = secretsManager.getSecret(reference); + assertFalse(result.isPresent()); + } + + @Test + public void testGetSecretsReturnsAllSecretsProperlyMappedWhenAllAreFound() { + final SecretReference reference1 = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_1_NAME); + final SecretReference reference2 = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_2_NAME); + final SecretReference reference3 = createSecretReference(PROVIDER_2_ID, PROVIDER_2_NAME, SECRET_3_NAME); + final Set<SecretReference> references = Set.of(reference1, reference2, reference3); + + final Map<SecretReference, Secret> results = secretsManager.getSecrets(references); + assertEquals(3, results.size()); + + final Secret secret1 = results.get(reference1); + assertNotNull(secret1); + assertEquals(SECRET_1_NAME, secret1.getName()); + assertEquals(SECRET_1_VALUE, secret1.getValue()); + + final Secret secret2 = results.get(reference2); + assertNotNull(secret2); + assertEquals(SECRET_2_NAME, secret2.getName()); + assertEquals(SECRET_2_VALUE, secret2.getValue()); + + final Secret secret3 = results.get(reference3); + assertNotNull(secret3); + assertEquals(SECRET_3_NAME, secret3.getName()); + assertEquals(SECRET_3_VALUE, secret3.getValue()); + } + + @Test + public void testGetSecretsReturnsNullValueForSecretWithInvalidName() { + final SecretReference validReference = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_1_NAME); + final SecretReference invalidReference = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, "non-existent-secret"); + final Set<SecretReference> references = Set.of(validReference, invalidReference); + + final Map<SecretReference, Secret> results = secretsManager.getSecrets(references); + assertEquals(2, results.size()); + + final Secret validSecret = results.get(validReference); + assertNotNull(validSecret); + assertEquals(SECRET_1_NAME, validSecret.getName()); + + assertTrue(results.containsKey(invalidReference)); + assertNull(results.get(invalidReference)); + } + + @Test + public void testGetSecretsReturnsNullValueForSecretWithInvalidProvider() { + final SecretReference validReference = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_1_NAME); + final SecretReference invalidProviderReference = createSecretReference("invalid-id", "Invalid Provider", SECRET_1_NAME); + final Set<SecretReference> references = Set.of(validReference, invalidProviderReference); + + final Map<SecretReference, Secret> results = secretsManager.getSecrets(references); + assertEquals(2, results.size()); + + final Secret validSecret = results.get(validReference); + assertNotNull(validSecret); + + assertTrue(results.containsKey(invalidProviderReference)); + assertNull(results.get(invalidProviderReference)); + } + + @Test + public void testGetSecretsFromMultipleProvidersWithMixedResults() { + final SecretReference provider1Secret1 = createSecretReference(PROVIDER_1_ID, null, SECRET_1_NAME); + final SecretReference provider1Invalid = createSecretReference(PROVIDER_1_ID, null, "invalid-secret"); + final SecretReference provider2Secret3 = createSecretReference(PROVIDER_2_ID, null, SECRET_3_NAME); + final SecretReference invalidProvider = createSecretReference("invalid-id", null, SECRET_1_NAME); + + final Set<SecretReference> references = Set.of(provider1Secret1, provider1Invalid, provider2Secret3, invalidProvider); + final Map<SecretReference, Secret> results = secretsManager.getSecrets(references); + assertEquals(4, results.size()); + + assertNotNull(results.get(provider1Secret1)); + assertEquals(SECRET_1_NAME, results.get(provider1Secret1).getName()); + + assertTrue(results.containsKey(provider1Invalid)); + assertNull(results.get(provider1Invalid)); + + assertNotNull(results.get(provider2Secret3)); + assertEquals(SECRET_3_NAME, results.get(provider2Secret3).getName()); + + assertTrue(results.containsKey(invalidProvider)); + assertNull(results.get(invalidProvider)); + } + + @Test + public void testGetSecretProviderSearchesByIdFirst() { + final SecretReference referenceWithBothIdAndName = createSecretReference(PROVIDER_1_ID, PROVIDER_2_NAME, SECRET_1_NAME); + final Optional<Secret> result = secretsManager.getSecret(referenceWithBothIdAndName); + assertTrue(result.isPresent()); + assertEquals(PROVIDER_1_NAME, result.get().getProviderName()); + } +} + diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfiguration.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfiguration.java index 88c54d2998..4261e2af3d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfiguration.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfiguration.java @@ -32,6 +32,15 @@ public class ConnectorConfiguration { return stepConfigurations; } + public NamedStepConfiguration getNamedStepConfiguration(final String name) { + for (final NamedStepConfiguration stepConfiguration : stepConfigurations) { + if (stepConfiguration.stepName().equals(name)) { + return stepConfiguration; + } + } + return null; + } + @Override public boolean equals(final Object other) { if (other == null || getClass() != other.getClass()) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java index 3ea48b996a..c951a29f59 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java @@ -17,6 +17,7 @@ package org.apache.nifi.components.connector; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConfigurationStep; @@ -81,6 +82,8 @@ public interface ConnectorRepository { void inheritConfiguration(ConnectorNode connector, List<VersionedConfigurationStep> flowConfiguration, Bundle flowContextBundle) throws FlowUpdateException; + SecretsManager getSecretsManager(); + /** * Creates a new ConnectorStateTransition instance for managing the lifecycle state of a connector. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java index 5b0b923ac7..0f0c50c394 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.components.connector; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.nar.ExtensionManager; @@ -27,6 +28,8 @@ public interface ConnectorRepositoryInitializationContext { ExtensionManager getExtensionManager(); + SecretsManager getSecretsManager(); + NodeTypeProvider getNodeTypeProvider(); ConnectorRequestReplicator getRequestReplicator(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java index ed977c4b1e..8749e48192 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java @@ -18,6 +18,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.components.connector.secrets.SecretsManager; public interface FrameworkConnectorInitializationContext extends ConnectorInitializationContext { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java index b19262e27c..030e333a84 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java @@ -18,6 +18,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.logging.ComponentLog; public interface FrameworkConnectorInitializationContextBuilder { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretProvider.java similarity index 71% copy from nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java copy to nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretProvider.java index ed977c4b1e..3c0a2f4e5f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretProvider.java @@ -15,14 +15,20 @@ * limitations under the License. */ -package org.apache.nifi.components.connector; +package org.apache.nifi.components.connector.secrets; -import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.components.connector.Secret; -public interface FrameworkConnectorInitializationContext extends ConnectorInitializationContext { +import java.util.List; - SecretsManager getSecretsManager(); +public interface SecretProvider { - AssetManager getAssetManager(); + String getProviderId(); + + String getProviderName(); + + List<Secret> getAllSecrets(); + + List<Secret> getSecrets(List<String> fullyQualifiedSecretNames); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java similarity index 57% copy from nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java copy to nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java index 5b0b923ac7..62365ed940 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManager.java @@ -15,20 +15,26 @@ * limitations under the License. */ -package org.apache.nifi.components.connector; +package org.apache.nifi.components.connector.secrets; -import org.apache.nifi.controller.NodeTypeProvider; -import org.apache.nifi.controller.flow.FlowManager; -import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.components.connector.Secret; +import org.apache.nifi.components.connector.SecretReference; -public interface ConnectorRepositoryInitializationContext { +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; - FlowManager getFlowManager(); +public interface SecretsManager { - ExtensionManager getExtensionManager(); + void initialize(SecretsManagerInitializationContext initializationContext); - NodeTypeProvider getNodeTypeProvider(); + List<Secret> getAllSecrets(); - ConnectorRequestReplicator getRequestReplicator(); + Set<SecretProvider> getSecretProviders(); + + Optional<Secret> getSecret(SecretReference secretReference); + + Map<SecretReference, Secret> getSecrets(Set<SecretReference> secretReferences); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java similarity index 75% copy from nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java copy to nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java index ed977c4b1e..4f4cf8e9a8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/secrets/SecretsManagerInitializationContext.java @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.nifi.components.connector; +package org.apache.nifi.components.connector.secrets; -import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.controller.flow.FlowManager; -public interface FrameworkConnectorInitializationContext extends ConnectorInitializationContext { +public interface SecretsManagerInitializationContext { - SecretsManager getSecretsManager(); - - AssetManager getAssetManager(); + FlowManager getFlowManager(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ParameterProviderNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ParameterProviderNode.java index 272d00f9ba..e697f78243 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ParameterProviderNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ParameterProviderNode.java @@ -49,6 +49,19 @@ public interface ParameterProviderNode extends ComponentNode { */ void fetchParameters(); + /** + * Fetches all parameter values from the Parameter Provider and returns them. This method does not cache the results for later retrieval. + * @return all Parameter Groups with Parameter Names and Values + */ + List<ParameterGroup> fetchParameterValues(); + + /** + * Fetches parameters that match the provided fully qualified parameter names. This method does not cache the results for later retrieval. + * @param fullyQualifiedParameterNames fully qualified names of parameters to fetch + * @return Parameter Groups with Parameter Names and Values that match the provided names + */ + List<ParameterGroup> fetchParameterValues(List<String> fullyQualifiedParameterNames); + /** * Find a named Parameter Group cached from previous request to fetch Parameters from the configured Parameter Provider * diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java index 6940112ad9..975bc2aafa 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java @@ -18,6 +18,8 @@ package org.apache.nifi.components.connector; import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.components.connector.secrets.SecretProvider; +import org.apache.nifi.components.connector.secrets.SecretsManager; import java.io.IOException; import java.io.UncheckedIOException; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java index 89730a5bac..c31cad237f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java @@ -19,6 +19,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.asset.AssetManager; import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.logging.ComponentLog; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index c1772de292..7e8839dedf 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -577,17 +577,18 @@ public class StandardConnectorNode implements ConnectorNode { } final ConfigurationStep configurationStep = optionalStep.get(); - final List<ValidationResult> validationResults = getConnector().validateConfigurationStep(configurationStep, configContext, validationContext); + + final List<ValidationResult> validationResults = new ArrayList<>(); + validatePropertyReferences(configurationStep, configurationOverrides, validationResults); + + if (validationResults.isEmpty()) { + final List<ValidationResult> implValidationResults = getConnector().validateConfigurationStep(configurationStep, configContext, validationContext); + validationResults.addAll(implValidationResults); + } final List<ConfigVerificationResult> invalidConfigResults = validationResults.stream() .filter(result -> !result.isValid()) - .map(validationResult -> new ConfigVerificationResult.Builder() - .verificationStepName("Property Validation - " + validationResult.getSubject()) - .outcome(Outcome.FAILED) - .subject(validationResult.getSubject()) - .explanation(validationResult.getExplanation()) - .build() - ) + .map(this::createConfigVerificationResult) .toList(); if (invalidConfigResults.isEmpty()) { @@ -595,15 +596,25 @@ public class StandardConnectorNode implements ConnectorNode { .verificationStepName("Property Validation") .outcome(Outcome.SUCCESSFUL) .build()); + + results.addAll(getConnector().verifyConfigurationStep(stepName, resolvedPropertyOverrides, workingFlowContext)); } else { results.addAll(invalidConfigResults); } - results.addAll(getConnector().verifyConfigurationStep(stepName, resolvedPropertyOverrides, workingFlowContext)); return results; } } + private ConfigVerificationResult createConfigVerificationResult(final ValidationResult validationResult) { + return new ConfigVerificationResult.Builder() + .verificationStepName("Property Validation - " + validationResult.getSubject()) + .outcome(validationResult.isValid() ? Outcome.SUCCESSFUL : Outcome.FAILED) + .subject(validationResult.getSubject()) + .explanation(validationResult.getExplanation()) + .build(); + } + private Map<String, String> resolvePropertyReferences(final StepConfiguration configurationOverrides) { final Map<String, String> resolvedProperties = new HashMap<>(); @@ -756,18 +767,23 @@ public class StandardConnectorNode implements ConnectorNode { final ConnectorValidationContext validationContext = createValidationContext(activeFlowContext); - List<ValidationResult> allResults; - try { - allResults = getConnector().validate(activeFlowContext, validationContext); - } catch (final Exception e) { - allResults = List.of(new ValidationResult.Builder() + final List<ValidationResult> allResults = new ArrayList<>(); + validatePropertyReferences(allResults); + + if (allResults.isEmpty()) { + try { + final List<ValidationResult> implValidationResults = getConnector().validate(activeFlowContext, validationContext); + allResults.addAll(implValidationResults); + } catch (final Exception e) { + allResults.add(new ValidationResult.Builder() .subject("Validation Failure") .valid(false) .explanation("Encountered a failure while attempting to perform validation: " + e.getMessage()) .build()); + } } - if (allResults == null) { + if (allResults.isEmpty()) { return new ValidationState(ValidationStatus.VALID, Collections.emptyList()); } @@ -786,6 +802,79 @@ public class StandardConnectorNode implements ConnectorNode { } } + + private void validatePropertyReferences(final List<ValidationResult> allResults) { + final List<ConfigurationStep> configurationSteps = getConnector().getConfigurationSteps(activeFlowContext); + final ConnectorConfiguration connectorConfiguration = activeFlowContext.getConfigurationContext().toConnectorConfiguration(); + + for (final ConfigurationStep step : configurationSteps) { + final NamedStepConfiguration namedStepConfig = connectorConfiguration.getNamedStepConfiguration(step.getName()); + if (namedStepConfig == null) { + continue; + } + + validatePropertyReferences(step, namedStepConfig.configuration(), allResults); + } + } + + private void validatePropertyReferences(final ConfigurationStep step, final StepConfiguration stepConfig, final List<ValidationResult> allResults) { + for (final ConnectorPropertyGroup propertyGroup : step.getPropertyGroups()) { + for (final ConnectorPropertyDescriptor descriptor : propertyGroup.getProperties()) { + final PropertyType propertyType = descriptor.getType(); + final ConnectorValueReference reference = stepConfig.getPropertyValue(descriptor.getName()); + + final String subject = step.getName() + " / " + descriptor.getName(); + + if (!isReferenceAllowed(reference, propertyType)) { + final String providedReferenceType = switch (reference.getValueType()) { + case ASSET_REFERENCE -> "<Asset reference>"; + case SECRET_REFERENCE -> "<Secret reference>"; + case STRING_LITERAL -> "<Explicit value>"; + }; + + final String expectedReferenceType = propertyType == PropertyType.SECRET ? "a Secret reference" : "an Explicit value"; + + allResults.add(new ValidationResult.Builder() + .subject(subject) + .input(providedReferenceType) + .explanation("This property must be configured with " + expectedReferenceType) + .build()); + } + } + } + } + + private boolean isReferenceAllowed(final ConnectorValueReference reference, final PropertyType propertyType) { + // If the reference is null or its value is unset, then it is allowed + if (reference == null) { + return true; + } + + switch (reference) { + case StringLiteralValue stringLiteralValue -> { + if (stringLiteralValue.getValue() == null) { + return true; + } + } + case AssetReference assetReference -> { + if (assetReference.getAssetIdentifier() == null) { + return true; + } + } + case SecretReference secretReference -> { + if (secretReference.getSecretName() == null) { + return true; + } + } + } + + if (propertyType == PropertyType.SECRET) { + return reference.getValueType() == ConnectorValueType.SECRET_REFERENCE; + } + + return reference.getValueType() != ConnectorValueType.SECRET_REFERENCE; + } + private ConnectorValidationContext createValidationContext(final FrameworkFlowContext context) { final DescribedValueProvider allowableValueProvider = (stepName, propertyName) -> fetchAllowableValues(stepName, propertyName, context); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java index d899d13681..4a1b8ed8b3 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java @@ -17,6 +17,7 @@ package org.apache.nifi.components.connector; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.nar.ExtensionManager; @@ -24,15 +25,18 @@ import org.apache.nifi.nar.ExtensionManager; public class StandardConnectorRepoInitializationContext implements ConnectorRepositoryInitializationContext { private final FlowManager flowManager; private final ExtensionManager extensionManager; + private final SecretsManager secretsManager; private final NodeTypeProvider nodeTypeProvider; private final ConnectorRequestReplicator requestReplicator; public StandardConnectorRepoInitializationContext(final FlowManager flowManager, final ExtensionManager extensionManager, + final SecretsManager secretsManager, final NodeTypeProvider nodeTypeProvider, final ConnectorRequestReplicator requestReplicator) { this.flowManager = flowManager; this.extensionManager = extensionManager; + this.secretsManager = secretsManager; this.nodeTypeProvider = nodeTypeProvider; this.requestReplicator = requestReplicator; } @@ -47,6 +51,11 @@ public class StandardConnectorRepoInitializationContext implements ConnectorRepo return extensionManager; } + @Override + public SecretsManager getSecretsManager() { + return secretsManager; + } + @Override public NodeTypeProvider getNodeTypeProvider() { return nodeTypeProvider; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java index 615595d42f..4cb87da4cf 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java @@ -18,6 +18,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.annotation.lifecycle.OnRemoved; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.flow.Bundle; import org.apache.nifi.flow.VersionedConfigurationStep; @@ -43,11 +44,13 @@ public class StandardConnectorRepository implements ConnectorRepository { private volatile ExtensionManager extensionManager; private volatile ConnectorRequestReplicator requestReplicator; + private volatile SecretsManager secretsManager; @Override public void initialize(final ConnectorRepositoryInitializationContext context) { this.extensionManager = context.getExtensionManager(); this.requestReplicator = context.getRequestReplicator(); + this.secretsManager = context.getSecretsManager(); } @Override @@ -173,6 +176,11 @@ public class StandardConnectorRepository implements ConnectorRepository { } } + @Override + public SecretsManager getSecretsManager() { + return secretsManager; + } + @Override public ConnectorStateTransition createStateTransition(final String type, final String id) { final String componentDescription = "StandardConnectorNode[id=" + id + ", type=" + type + "]"; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java index f5da41580e..707bdb5266 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -42,7 +42,6 @@ import org.apache.nifi.components.connector.FrameworkConnectorInitializationCont import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.components.connector.GhostConnector; import org.apache.nifi.components.connector.MutableConnectorConfigurationContext; -import org.apache.nifi.components.connector.SecretsManager; import org.apache.nifi.components.connector.StandardConnectorNode; import org.apache.nifi.components.connector.StepConfiguration; import org.apache.nifi.components.connector.StringLiteralValue; @@ -543,7 +542,7 @@ public class ExtensionBuilder { for (final ConnectorPropertyDescriptor descriptor : propertyGroup.getProperties()) { final String name = descriptor.getName(); final String defaultValue = descriptor.getDefaultValue(); - defaultValues.put(name, new StringLiteralValue(defaultValue)); + defaultValues.put(name, defaultValue == null ? null : new StringLiteralValue(defaultValue)); } } @@ -554,13 +553,12 @@ public class ExtensionBuilder { private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog) { final String name = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; - final SecretsManager secretsManager = null; return connectorInitializationContextBuilder .identifier(identifier) .name(name) .componentLog(componentLog) - .secretsManager(secretsManager) + .secretsManager(flowController.getConnectorRepository().getSecretsManager()) .assetManager(flowController.getAssetManager()) .build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index d119cd23ac..876ec6a70c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -53,6 +53,10 @@ import org.apache.nifi.components.connector.ConnectorRepositoryInitializationCon import org.apache.nifi.components.connector.ConnectorRequestReplicator; import org.apache.nifi.components.connector.StandardConnectorRepoInitializationContext; import org.apache.nifi.components.connector.StandardConnectorRepository; +import org.apache.nifi.components.connector.secrets.ParameterProviderSecretsManager; +import org.apache.nifi.components.connector.secrets.SecretsManager; +import org.apache.nifi.components.connector.secrets.SecretsManagerInitializationContext; +import org.apache.nifi.components.connector.secrets.StandardSecretsManagerInitializationContext; import org.apache.nifi.components.monitor.LongRunningTaskMonitor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.components.state.StateProvider; @@ -269,6 +273,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager"; public static final String DEFAULT_ASSET_MANAGER_IMPLEMENTATION = StandardAssetManager.class.getName(); public static final String DEFAULT_CONNECTOR_REPOSITORY_IMPLEMENTATION = StandardConnectorRepository.class.getName(); + public static final String DEFAULT_SECRETS_MANAGER_IMPLEMENTATION = ParameterProviderSecretsManager.class.getName(); public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds"; public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10; @@ -603,7 +608,8 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr controllerServiceResolver = new StandardControllerServiceResolver(authorizer, flowManager, new VersionedComponentFlowMapper(extensionManager), controllerServiceProvider, new StandardControllerServiceApiLookup(extensionManager)); - connectorRepository = createConnectorRepository(nifiProperties, extensionManager, flowManager, this, connectorRequestReplicator); + final SecretsManager secretsManager = createSecretsManager(nifiProperties, extensionManager, flowManager); + connectorRepository = createConnectorRepository(nifiProperties, extensionManager, flowManager, secretsManager, this, connectorRequestReplicator); final PythonBridge rawPythonBridge = createPythonBridge(nifiProperties, controllerServiceProvider); final ClassLoader pythonBridgeClassLoader = rawPythonBridge.getClass().getClassLoader(); @@ -876,7 +882,7 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr } private static ConnectorRepository createConnectorRepository(final NiFiProperties properties, final ExtensionDiscoveringManager extensionManager, final FlowManager flowManager, - final NodeTypeProvider nodeTypeProvider, final ConnectorRequestReplicator requestReplicator) { + final SecretsManager secretsManager, final NodeTypeProvider nodeTypeProvider, final ConnectorRequestReplicator requestReplicator) { final String implementationClassName = properties.getProperty(NiFiProperties.CONNECTOR_REPOSITORY_IMPLEMENTATION, DEFAULT_CONNECTOR_REPOSITORY_IMPLEMENTATION); @@ -884,12 +890,14 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr // Discover implementations of Connector Repository. This is not done at startup because the ConnectorRepository class is not // provided in the list of standard extension points. This is due to the fact that ConnectorRepository lives in the nifi-framework-core-api, and // does not make sense to refactor it into some other module due to its dependencies, simply to allow it to be discovered at startup. - extensionManager.discoverExtensions(extensionManager.getAllBundles(), Set.of(ConnectorRepository.class), true); + final Set<Class<?>> additionalExtensionTypes = Set.of(ConnectorRepository.class, SecretsManager.class); + extensionManager.discoverExtensions(extensionManager.getAllBundles(), additionalExtensionTypes, true); final ConnectorRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ConnectorRepository.class, properties); final ConnectorRepositoryInitializationContext initializationContext = new StandardConnectorRepoInitializationContext( flowManager, extensionManager, + secretsManager, nodeTypeProvider, requestReplicator ); @@ -901,6 +909,35 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr } } + LOG.info("Created Connector Repository of type {}", created.getClass().getSimpleName()); + + return created; + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private static SecretsManager createSecretsManager(final NiFiProperties properties, final ExtensionDiscoveringManager extensionManager, final FlowManager flowManager) { + final String implementationClassName = properties.getProperty(NiFiProperties.SECRETS_MANAGER_IMPLEMENTATION, DEFAULT_SECRETS_MANAGER_IMPLEMENTATION); + + try { + // Discover implementations of Secrets Manager. This is not done at startup because the SecretsManager class is not + // provided in the list of standard extension points. This is due to the fact that SecretsManager lives in the nifi-framework-core-api, and + // does not make sense to refactor it into some other module due to its dependencies, simply to allow it to be discovered at startup. + extensionManager.discoverExtensions(extensionManager.getAllBundles(), Set.of(SecretsManager.class), true); + final SecretsManager created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, SecretsManager.class, properties); + + final SecretsManagerInitializationContext initializationContext = new StandardSecretsManagerInitializationContext(flowManager); + + synchronized (created) { + // Ensure that any NAR dependencies are available when we initialize the ConnectorRepository + try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, created.getClass(), "secrets-manager")) { + created.initialize(initializationContext); + } + } + + LOG.info("Created Secrets Manager of type {}", created.getClass().getSimpleName()); + return created; } catch (final Exception e) { throw new RuntimeException(e); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java index d4e5fd6e87..804d0f732e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java @@ -764,7 +764,9 @@ public class StandardFlowManager extends AbstractFlowManager implements FlowMana final ConnectorRepository connectorRepository = flowController.getConnectorRepository(); final ConnectorStateTransition stateTransition = connectorRepository.createStateTransition(type, id); - final StandardConnectorConfigurationContext activeConfigurationContext = new StandardConnectorConfigurationContext(flowController.getAssetManager(), null); + final StandardConnectorConfigurationContext activeConfigurationContext = new StandardConnectorConfigurationContext( + flowController.getAssetManager(), flowController.getConnectorRepository().getSecretsManager()); + final ProcessGroupFactory processGroupFactory = groupId -> createProcessGroup(groupId, false); final FlowContextFactory flowContextFactory = new FlowControllerFlowContextFactory(flowController, managedRootGroup, activeConfigurationContext, processGroupFactory); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorConfigurationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorConfigurationContext.java index f3da3a8e50..9c8b6b6e08 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorConfigurationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorConfigurationContext.java @@ -18,6 +18,7 @@ package org.apache.nifi.components.connector; import org.apache.nifi.asset.AssetManager; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java index 40f2323b9b..5147fb0d04 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.connector.components.FlowContext; import org.apache.nifi.components.connector.components.FlowContextType; +import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.flow.Bundle;
