This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new c949b0ddd9 NIFI-15355 Connector Configuration Repository (#10876)
c949b0ddd9 is described below
commit c949b0ddd9d51bb7ae4263bbcd99861f40a0137c
Author: Kevin Doran <[email protected]>
AuthorDate: Tue Feb 10 13:51:41 2026 -0500
NIFI-15355 Connector Configuration Repository (#10876)
* NIFI-15255 Introduce ConnectorConfigurationRepository Extension Interface
* Add new property key to default nifi.properties file
* Address peer review comments
---
.../java/org/apache/nifi/util/NiFiProperties.java | 4 +
.../connector/ConnectorConfigurationProvider.java | 93 +++++
.../ConnectorConfigurationProviderException.java | 33 +-
...ConfigurationProviderInitializationContext.java | 38 +-
.../connector/ConnectorWorkingConfiguration.java | 51 +++
.../components/connector/ConnectorRepository.java | 21 ++
.../ConnectorRepositoryInitializationContext.java | 10 +
...onfigurationProviderInitializationContext.java} | 27 +-
...StandardConnectorRepoInitializationContext.java | 10 +-
.../connector/StandardConnectorRepository.java | 204 ++++++++++-
.../org/apache/nifi/controller/FlowController.java | 51 ++-
.../serialization/VersionedFlowSynchronizer.java | 2 +-
.../connector/TestStandardConnectorRepository.java | 403 +++++++++++++++++++++
.../src/main/resources/conf/nifi.properties | 3 +
.../apache/nifi/web/StandardNiFiServiceFacade.java | 6 +-
.../java/org/apache/nifi/web/dao/ConnectorDAO.java | 2 +
.../nifi/web/dao/impl/StandardConnectorDAO.java | 13 +-
.../web/dao/impl/StandardConnectorDAOTest.java | 24 +-
18 files changed, 912 insertions(+), 83 deletions(-)
diff --git
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 572d4d3582..7948bf6c1d 100644
---
a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++
b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -142,6 +142,10 @@ public class NiFiProperties extends ApplicationProperties {
// Connector Repository properties
public static final String CONNECTOR_REPOSITORY_IMPLEMENTATION =
"nifi.components.connectors.repository.implementation";
+ // Connector Configuration Provider properties
+ public static final String CONNECTOR_CONFIGURATION_PROVIDER_IMPLEMENTATION
= "nifi.components.connectors.configuration.provider.implementation";
+ public static final String
CONNECTOR_CONFIGURATION_PROVIDER_PROPERTIES_PREFIX =
"nifi.components.connectors.configuration.provider.";
+
// Secrets Manager properties
public static final String SECRETS_MANAGER_IMPLEMENTATION =
"nifi.secrets.manager.implementation";
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java
new file mode 100644
index 0000000000..fba4912e16
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import java.util.Optional;
+
+/**
+ * Extension point interface for external management of Connector working
configuration.
+ * Implementations of this interface allow a Connector's name and working flow
configuration
+ * to be persisted in an external store (such as a database) and to be
externally modified.
+ *
+ * <p>When a ConnectorConfigurationProvider is configured, the framework will:
+ * <ul>
+ * <li>On reads (e.g., getConnector): load configuration from the provider
and override the in-memory working configuration</li>
+ * <li>On writes (e.g., configureConnector): save configuration to the
provider before modifying the in-memory state</li>
+ * <li>On discard: notify the provider that the working configuration has
been discarded</li>
+ * <li>On delete: notify the provider that the connector has been
removed</li>
+ * </ul>
+ */
+public interface ConnectorConfigurationProvider {
+
+ /**
+ * Initializes the ConnectorConfigurationProvider with the given context.
+ *
+ * @param context the initialization context providing configuration
properties
+ */
+ void initialize(ConnectorConfigurationProviderInitializationContext
context);
+
+ /**
+ * Loads the externally managed working configuration for the connector
with the given identifier.
+ *
+ * @param connectorId the identifier of the connector
+ * @return an Optional containing the working configuration if one exists
in the external store,
+ * or an empty Optional if no external configuration exists (in
which case the in-memory configuration is used)
+ */
+ Optional<ConnectorWorkingConfiguration> load(String connectorId);
+
+ /**
+ * Saves the working configuration for the connector with the given
identifier to the external store.
+ * This is called when working configuration properties are modified, such
as during configureConnector
+ * or when connector metadata (e.g., name) is updated.
+ *
+ * @param connectorId the identifier of the connector
+ * @param configuration the working configuration to save
+ */
+ void save(String connectorId, ConnectorWorkingConfiguration configuration);
+
+ /**
+ * Notifies the provider that the working configuration for the given
connector has been discarded
+ * (i.e., reset to match the active configuration). This is semantically
distinct from {@link #save}
+ * because the external store may need to handle this differently, such as
deleting the working copy
+ * rather than overwriting it.
+ *
+ * @param connectorId the identifier of the connector whose working
configuration was discarded
+ */
+ void discard(String connectorId);
+
+ /**
+ * Notifies the provider that the connector with the given identifier has
been removed entirely.
+ * The provider should clean up any stored configuration for this
connector.
+ *
+ * @param connectorId the identifier of the connector that was removed
+ */
+ void delete(String connectorId);
+
+ /**
+ * Verifies that the provider can support creating a connector with the
given identifier.
+ * This is called before the connector is actually created, giving the
provider an opportunity
+ * to reject the operation (for example, if it has reached a capacity
limit or the connector
+ * already exists in the external store in an incompatible state).
+ *
+ * <p>If the provider cannot support the create operation, it should throw
a
+ * {@link ConnectorConfigurationProviderException}.</p>
+ *
+ * @param connectorId the identifier of the connector to be created
+ */
+ void verifyCreate(String connectorId);
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProviderException.java
similarity index 51%
copy from
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
copy to
nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProviderException.java
index f5c4887dad..de772d0a17 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProviderException.java
@@ -17,24 +17,21 @@
package org.apache.nifi.components.connector;
-import org.apache.nifi.asset.AssetManager;
-import org.apache.nifi.components.connector.secrets.SecretsManager;
-import org.apache.nifi.controller.NodeTypeProvider;
-import org.apache.nifi.controller.flow.FlowManager;
-import org.apache.nifi.nar.ExtensionManager;
-
-public interface ConnectorRepositoryInitializationContext {
-
- FlowManager getFlowManager();
-
- ExtensionManager getExtensionManager();
-
- SecretsManager getSecretsManager();
-
- AssetManager getAssetManager();
-
- NodeTypeProvider getNodeTypeProvider();
+/**
+ * Runtime exception thrown by {@link ConnectorConfigurationProvider}
implementations
+ * to indicate a failure in an external configuration operation such as load,
save, discard, or delete.
+ *
+ * <p>This exception type allows provider implementations to signal failures
using a well-defined
+ * exception rather than generic exceptions like IOException or
database-specific exceptions.
+ * The framework will propagate these exceptions to callers rather than
handling them silently.</p>
+ */
+public class ConnectorConfigurationProviderException extends RuntimeException {
- ConnectorRequestReplicator getRequestReplicator();
+ public ConnectorConfigurationProviderException(final String message) {
+ super(message);
+ }
+ public ConnectorConfigurationProviderException(final String message, final
Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProviderInitializationContext.java
similarity index 50%
copy from
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
copy to
nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProviderInitializationContext.java
index f5c4887dad..d8eba45bf1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProviderInitializationContext.java
@@ -17,24 +17,24 @@
package org.apache.nifi.components.connector;
-import org.apache.nifi.asset.AssetManager;
-import org.apache.nifi.components.connector.secrets.SecretsManager;
-import org.apache.nifi.controller.NodeTypeProvider;
-import org.apache.nifi.controller.flow.FlowManager;
-import org.apache.nifi.nar.ExtensionManager;
-
-public interface ConnectorRepositoryInitializationContext {
-
- FlowManager getFlowManager();
-
- ExtensionManager getExtensionManager();
-
- SecretsManager getSecretsManager();
-
- AssetManager getAssetManager();
-
- NodeTypeProvider getNodeTypeProvider();
-
- ConnectorRequestReplicator getRequestReplicator();
+import java.util.Map;
+/**
+ * Initialization context for a {@link ConnectorConfigurationProvider}.
+ * Provides the configuration properties necessary for the provider to
initialize itself,
+ * such as database connection strings or other external store configuration.
+ *
+ * <p>Properties are extracted from NiFi properties with the prefix
+ * {@code nifi.components.connectors.configuration.provider.} stripped. For
example,
+ * a NiFi property {@code
nifi.components.connectors.configuration.provider.db.url=jdbc:...}
+ * becomes {@code db.url=jdbc:...} in the returned map.</p>
+ */
+public interface ConnectorConfigurationProviderInitializationContext {
+
+ /**
+ * Returns the configuration properties for this provider.
+ *
+ * @return a map of property names to values
+ */
+ Map<String, String> getProperties();
}
diff --git
a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java
new file mode 100644
index 0000000000..fbe3a3af21
--- /dev/null
+++
b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorWorkingConfiguration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.flow.VersionedConfigurationStep;
+
+import java.util.List;
+
+/**
+ * Represents the externally managed working configuration of a Connector,
+ * including its name and working flow configuration steps.
+ *
+ * <p>This is a mutable POJO used as the input/output for {@link
ConnectorConfigurationProvider}
+ * operations. The style follows the same pattern as {@link
VersionedConfigurationStep} and other
+ * versioned types in the NiFi API.</p>
+ */
+public class ConnectorWorkingConfiguration {
+ private String name;
+ private List<VersionedConfigurationStep> workingFlowConfiguration;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ public List<VersionedConfigurationStep> getWorkingFlowConfiguration() {
+ return workingFlowConfiguration;
+ }
+
+ public void setWorkingFlowConfiguration(final
List<VersionedConfigurationStep> workingFlowConfiguration) {
+ this.workingFlowConfiguration = workingFlowConfiguration;
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
index 59b0bf03af..ea4c65579c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
@@ -28,6 +28,17 @@ public interface ConnectorRepository {
void initialize(ConnectorRepositoryInitializationContext context);
+ /**
+ * Verifies that a connector with the given identifier can be created.
This checks that the connector
+ * does not already exist in the repository, and if a
ConnectorConfigurationProvider is configured,
+ * delegates to the provider's verifyCreate method to ensure the external
store can support the operation.
+ *
+ * @param connectorId the identifier of the connector to be created
+ * @throws IllegalStateException if a connector with the given identifier
already exists
+ * @throws ConnectorConfigurationProviderException if the provider rejects
the create operation
+ */
+ void verifyCreate(String connectorId);
+
/**
* Adds the given Connector to the Repository
* @param connector the Connector to add
@@ -84,6 +95,16 @@ public interface ConnectorRepository {
*/
Future<Void> restartConnector(ConnectorNode connector);
+ /**
+ * Updates the metadata of a Connector, such as its name. This method
should be used instead of calling
+ * {@link ConnectorNode#setName(String)} directly, so that the
ConnectorRepository can synchronize
+ * changes with the ConnectorConfigurationProvider if one is configured.
+ *
+ * @param connector the Connector to update
+ * @param name the new name for the Connector
+ */
+ void updateConnector(ConnectorNode connector, String name);
+
void configureConnector(ConnectorNode connector, String stepName,
StepConfiguration configuration) throws FlowUpdateException;
void applyUpdate(ConnectorNode connector, ConnectorUpdateContext context)
throws FlowUpdateException;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
index f5c4887dad..91bca1fcc9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
@@ -37,4 +37,14 @@ public interface ConnectorRepositoryInitializationContext {
ConnectorRequestReplicator getRequestReplicator();
+ /**
+ * Returns the ConnectorConfigurationProvider to use for external
management of connector working configuration,
+ * or null if no provider is configured.
+ *
+ * @return the ConnectorConfigurationProvider, or null if not configured
+ */
+ default ConnectorConfigurationProvider getConnectorConfigurationProvider()
{
+ return null;
+ }
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationProviderInitializationContext.java
similarity index 59%
copy from
nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
copy to
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationProviderInitializationContext.java
index f5c4887dad..e7d2f2179e 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepositoryInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationProviderInitializationContext.java
@@ -17,24 +17,19 @@
package org.apache.nifi.components.connector;
-import org.apache.nifi.asset.AssetManager;
-import org.apache.nifi.components.connector.secrets.SecretsManager;
-import org.apache.nifi.controller.NodeTypeProvider;
-import org.apache.nifi.controller.flow.FlowManager;
-import org.apache.nifi.nar.ExtensionManager;
+import java.util.Map;
+import java.util.Objects;
-public interface ConnectorRepositoryInitializationContext {
+public class StandardConnectorConfigurationProviderInitializationContext
implements ConnectorConfigurationProviderInitializationContext {
- FlowManager getFlowManager();
+ private final Map<String, String> properties;
- ExtensionManager getExtensionManager();
-
- SecretsManager getSecretsManager();
-
- AssetManager getAssetManager();
-
- NodeTypeProvider getNodeTypeProvider();
-
- ConnectorRequestReplicator getRequestReplicator();
+ public StandardConnectorConfigurationProviderInitializationContext(final
Map<String, String> properties) {
+ this.properties = Map.copyOf(Objects.requireNonNull(properties,
"Properties is required"));
+ }
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java
index 5c3d71f045..a0661fa58f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepoInitializationContext.java
@@ -30,19 +30,22 @@ public class StandardConnectorRepoInitializationContext
implements ConnectorRepo
private final AssetManager assetManager;
private final NodeTypeProvider nodeTypeProvider;
private final ConnectorRequestReplicator requestReplicator;
+ private final ConnectorConfigurationProvider
connectorConfigurationProvider;
public StandardConnectorRepoInitializationContext(final FlowManager
flowManager,
final ExtensionManager
extensionManager,
final SecretsManager
secretsManager,
final AssetManager
assetManager,
final NodeTypeProvider
nodeTypeProvider,
- final
ConnectorRequestReplicator requestReplicator) {
+ final
ConnectorRequestReplicator requestReplicator,
+ final
ConnectorConfigurationProvider connectorConfigurationProvider) {
this.flowManager = flowManager;
this.extensionManager = extensionManager;
this.secretsManager = secretsManager;
this.assetManager = assetManager;
this.nodeTypeProvider = nodeTypeProvider;
this.requestReplicator = requestReplicator;
+ this.connectorConfigurationProvider = connectorConfigurationProvider;
}
@Override
@@ -74,4 +77,9 @@ public class StandardConnectorRepoInitializationContext
implements ConnectorRepo
public ConnectorRequestReplicator getRequestReplicator() {
return requestReplicator;
}
+
+ @Override
+ public ConnectorConfigurationProvider getConnectorConfigurationProvider() {
+ return connectorConfigurationProvider;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index 702bb5777d..cf7cb877c4 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -23,6 +23,7 @@ import
org.apache.nifi.components.connector.secrets.SecretsManager;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConfigurationStep;
+import org.apache.nifi.flow.VersionedConnectorValueReference;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.util.ReflectionUtils;
@@ -31,9 +32,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +54,7 @@ public class StandardConnectorRepository implements
ConnectorRepository {
private volatile ConnectorRequestReplicator requestReplicator;
private volatile SecretsManager secretsManager;
private volatile ConnectorAssetRepository assetRepository;
+ private volatile ConnectorConfigurationProvider configurationProvider;
@Override
public void initialize(final ConnectorRepositoryInitializationContext
context) {
@@ -58,17 +63,29 @@ public class StandardConnectorRepository implements
ConnectorRepository {
this.requestReplicator = context.getRequestReplicator();
this.secretsManager = context.getSecretsManager();
this.assetRepository = new
StandardConnectorAssetRepository(context.getAssetManager());
- logger.debug("Successfully initialized ConnectorRepository");
+ this.configurationProvider =
context.getConnectorConfigurationProvider();
+ logger.debug("Successfully initialized ConnectorRepository with
configurationProvider={}", configurationProvider != null ?
configurationProvider.getClass().getSimpleName() : "null");
+ }
+
+ @Override
+ public void verifyCreate(final String connectorId) {
+ if (connectors.containsKey(connectorId)) {
+ throw new IllegalStateException("A Connector already exists with
ID %s".formatted(connectorId));
+ }
+ if (configurationProvider != null) {
+ configurationProvider.verifyCreate(connectorId);
+ }
}
@Override
public void addConnector(final ConnectorNode connector) {
+ syncFromProvider(connector);
connectors.put(connector.getIdentifier(), connector);
}
@Override
public void restoreConnector(final ConnectorNode connector) {
- addConnector(connector);
+ connectors.put(connector.getIdentifier(), connector);
logger.debug("Successfully restored {}", connector);
}
@@ -81,6 +98,9 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
connectorNode.verifyCanDelete();
+ if (configurationProvider != null) {
+ configurationProvider.delete(connectorId);
+ }
connectors.remove(connectorId);
final Class<?> taskClass = connectorNode.getConnector().getClass();
@@ -93,12 +113,20 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public ConnectorNode getConnector(final String identifier) {
- return connectors.get(identifier);
+ final ConnectorNode connector = connectors.get(identifier);
+ if (connector != null) {
+ syncFromProvider(connector);
+ }
+ return connector;
}
@Override
public List<ConnectorNode> getConnectors() {
- return List.copyOf(connectors.values());
+ final List<ConnectorNode> connectorList =
List.copyOf(connectors.values());
+ for (final ConnectorNode connector : connectorList) {
+ syncFromProvider(connector);
+ }
+ return connectorList;
}
@Override
@@ -144,6 +172,10 @@ public class StandardConnectorRepository implements
ConnectorRepository {
final ConnectorState initialDesiredState = connector.getDesiredState();
logger.info("Applying update to Connector {}", connector);
+ // Sync the working configuration from the external provider before
applying the update,
+ // so that the update is applied using the latest externally managed
configuration.
+ syncFromProvider(connector);
+
// Transition the connector's state to PREPARING_FOR_UPDATE before
starting the background process.
// This allows us to ensure that if we poll and see the state in the
same state it was in before that
// we know the update has already completed (successfully or
otherwise).
@@ -271,8 +303,23 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
}
+ @Override
+ public void updateConnector(final ConnectorNode connector, final String
name) {
+ if (configurationProvider != null) {
+ final ConnectorWorkingConfiguration workingConfiguration =
buildWorkingConfiguration(connector);
+ workingConfiguration.setName(name);
+ configurationProvider.save(connector.getIdentifier(),
workingConfiguration);
+ }
+ connector.setName(name);
+ }
+
@Override
public void configureConnector(final ConnectorNode connector, final String
stepName, final StepConfiguration configuration) throws FlowUpdateException {
+ if (configurationProvider != null) {
+ final ConnectorWorkingConfiguration mergedConfiguration =
buildMergedWorkingConfiguration(connector, stepName, configuration);
+ configurationProvider.save(connector.getIdentifier(),
mergedConfiguration);
+ }
+
connector.setConfiguration(stepName, configuration);
logger.info("Successfully configured {} for step {}", connector,
stepName);
}
@@ -297,6 +344,9 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public void discardWorkingConfiguration(final ConnectorNode connector) {
+ if (configurationProvider != null) {
+ configurationProvider.discard(connector.getIdentifier());
+ }
connector.discardWorkingConfiguration();
cleanUpAssets(connector);
}
@@ -321,4 +371,150 @@ public class StandardConnectorRepository implements
ConnectorRepository {
public ConnectorAssetRepository getAssetRepository() {
return assetRepository;
}
+
+ private void syncFromProvider(final ConnectorNode connector) {
+ if (configurationProvider == null) {
+ return;
+ }
+
+ final Optional<ConnectorWorkingConfiguration> externalConfig =
configurationProvider.load(connector.getIdentifier());
+ if (externalConfig.isEmpty()) {
+ return;
+ }
+
+ final ConnectorWorkingConfiguration config = externalConfig.get();
+ if (config.getName() != null) {
+ connector.setName(config.getName());
+ }
+
+ final List<VersionedConfigurationStep> workingFlowConfiguration =
config.getWorkingFlowConfiguration();
+ if (workingFlowConfiguration != null) {
+ final MutableConnectorConfigurationContext workingConfigContext =
connector.getWorkingFlowContext().getConfigurationContext();
+ for (final VersionedConfigurationStep step :
workingFlowConfiguration) {
+ final StepConfiguration stepConfiguration =
toStepConfiguration(step);
+ workingConfigContext.replaceProperties(step.getName(),
stepConfiguration);
+ }
+ }
+ }
+
+ private ConnectorWorkingConfiguration buildWorkingConfiguration(final
ConnectorNode connector) {
+ final ConnectorWorkingConfiguration config = new
ConnectorWorkingConfiguration();
+ config.setName(connector.getName());
+
config.setWorkingFlowConfiguration(buildVersionedConfigurationSteps(connector.getWorkingFlowContext()));
+ return config;
+ }
+
+ private List<VersionedConfigurationStep>
buildVersionedConfigurationSteps(final FrameworkFlowContext flowContext) {
+ if (flowContext == null) {
+ return List.of();
+ }
+
+ final ConnectorConfiguration configuration =
flowContext.getConfigurationContext().toConnectorConfiguration();
+ final List<VersionedConfigurationStep> configurationSteps = new
ArrayList<>();
+
+ for (final NamedStepConfiguration namedStepConfiguration :
configuration.getNamedStepConfigurations()) {
+ final VersionedConfigurationStep versionedConfigurationStep = new
VersionedConfigurationStep();
+
versionedConfigurationStep.setName(namedStepConfiguration.stepName());
+
versionedConfigurationStep.setProperties(toVersionedProperties(namedStepConfiguration.configuration()));
+ configurationSteps.add(versionedConfigurationStep);
+ }
+
+ return configurationSteps;
+ }
+
+ private ConnectorWorkingConfiguration
buildMergedWorkingConfiguration(final ConnectorNode connector, final String
stepName, final StepConfiguration incomingConfiguration) {
+ final ConnectorWorkingConfiguration existingConfig;
+ if (configurationProvider != null) {
+ final Optional<ConnectorWorkingConfiguration> externalConfig =
configurationProvider.load(connector.getIdentifier());
+ existingConfig = externalConfig.orElseGet(() ->
buildWorkingConfiguration(connector));
+ } else {
+ existingConfig = buildWorkingConfiguration(connector);
+ }
+
+ final List<VersionedConfigurationStep> existingSteps =
existingConfig.getWorkingFlowConfiguration() != null
+ ? new ArrayList<>(existingConfig.getWorkingFlowConfiguration())
+ : new ArrayList<>();
+
+ VersionedConfigurationStep targetStep = null;
+ for (final VersionedConfigurationStep step : existingSteps) {
+ if (stepName.equals(step.getName())) {
+ targetStep = step;
+ break;
+ }
+ }
+
+ if (targetStep == null) {
+ targetStep = new VersionedConfigurationStep();
+ targetStep.setName(stepName);
+ targetStep.setProperties(new HashMap<>());
+ existingSteps.add(targetStep);
+ }
+
+ final Map<String, VersionedConnectorValueReference> mergedProperties =
targetStep.getProperties() != null
+ ? new HashMap<>(targetStep.getProperties())
+ : new HashMap<>();
+
+ for (final Map.Entry<String, ConnectorValueReference> entry :
incomingConfiguration.getPropertyValues().entrySet()) {
+ if (entry.getValue() != null) {
+ mergedProperties.put(entry.getKey(),
toVersionedValueReference(entry.getValue()));
+ }
+ }
+ targetStep.setProperties(mergedProperties);
+
+ existingConfig.setWorkingFlowConfiguration(existingSteps);
+ return existingConfig;
+ }
+
+ private Map<String, VersionedConnectorValueReference>
toVersionedProperties(final StepConfiguration configuration) {
+ final Map<String, VersionedConnectorValueReference>
versionedProperties = new HashMap<>();
+ for (final Map.Entry<String, ConnectorValueReference> entry :
configuration.getPropertyValues().entrySet()) {
+ final ConnectorValueReference valueReference = entry.getValue();
+ if (valueReference != null) {
+ versionedProperties.put(entry.getKey(),
toVersionedValueReference(valueReference));
+ }
+ }
+ return versionedProperties;
+ }
+
+ private VersionedConnectorValueReference toVersionedValueReference(final
ConnectorValueReference valueReference) {
+ final VersionedConnectorValueReference versionedReference = new
VersionedConnectorValueReference();
+ versionedReference.setValueType(valueReference.getValueType().name());
+
+ switch (valueReference) {
+ case StringLiteralValue stringLiteral ->
versionedReference.setValue(stringLiteral.getValue());
+ case AssetReference assetRef ->
versionedReference.setAssetIds(assetRef.getAssetIdentifiers());
+ case SecretReference secretRef -> {
+ versionedReference.setProviderId(secretRef.getProviderId());
+
versionedReference.setProviderName(secretRef.getProviderName());
+ versionedReference.setSecretName(secretRef.getSecretName());
+
versionedReference.setFullyQualifiedSecretName(secretRef.getFullyQualifiedName());
+ }
+ }
+
+ return versionedReference;
+ }
+
+ private StepConfiguration toStepConfiguration(final
VersionedConfigurationStep step) {
+ final Map<String, ConnectorValueReference> propertyValues = new
HashMap<>();
+ final Map<String, VersionedConnectorValueReference>
versionedProperties = step.getProperties();
+ if (versionedProperties != null) {
+ for (final Map.Entry<String, VersionedConnectorValueReference>
entry : versionedProperties.entrySet()) {
+ final VersionedConnectorValueReference versionedRef =
entry.getValue();
+ if (versionedRef != null) {
+ propertyValues.put(entry.getKey(),
toConnectorValueReference(versionedRef));
+ }
+ }
+ }
+ return new StepConfiguration(propertyValues);
+ }
+
+ private ConnectorValueReference toConnectorValueReference(final
VersionedConnectorValueReference versionedReference) {
+ final ConnectorValueType valueType =
ConnectorValueType.valueOf(versionedReference.getValueType());
+ return switch (valueType) {
+ case STRING_LITERAL -> new
StringLiteralValue(versionedReference.getValue());
+ case ASSET_REFERENCE -> new
AssetReference(versionedReference.getAssetIds());
+ case SECRET_REFERENCE -> new
SecretReference(versionedReference.getProviderId(),
versionedReference.getProviderName(),
+ versionedReference.getSecretName(),
versionedReference.getFullyQualifiedSecretName());
+ };
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 1bea151791..d28b01d1e5 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -49,10 +49,13 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.components.ClassLoaderAwarePythonBridge;
+import org.apache.nifi.components.connector.ConnectorConfigurationProvider;
+import
org.apache.nifi.components.connector.ConnectorConfigurationProviderInitializationContext;
import org.apache.nifi.components.connector.ConnectorRepository;
import
org.apache.nifi.components.connector.ConnectorRepositoryInitializationContext;
import org.apache.nifi.components.connector.ConnectorRequestReplicator;
import org.apache.nifi.components.connector.ConnectorValidationTrigger;
+import
org.apache.nifi.components.connector.StandardConnectorConfigurationProviderInitializationContext;
import
org.apache.nifi.components.connector.StandardConnectorRepoInitializationContext;
import org.apache.nifi.components.connector.StandardConnectorRepository;
import org.apache.nifi.components.connector.StandardConnectorValidationTrigger;
@@ -627,7 +630,9 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
controllerServiceProvider, new
StandardControllerServiceApiLookup(extensionManager));
final SecretsManager secretsManager =
createSecretsManager(nifiProperties, extensionManager, flowManager);
- connectorRepository = createConnectorRepository(nifiProperties,
extensionManager, flowManager, connectorAssetManager, secretsManager, this,
connectorRequestReplicator);
+ final ConnectorConfigurationProvider connectorConfigurationProvider =
createConnectorConfigurationProvider(nifiProperties, extensionManager);
+ connectorRepository = createConnectorRepository(nifiProperties,
extensionManager, flowManager, connectorAssetManager, secretsManager, this,
connectorRequestReplicator,
+ connectorConfigurationProvider);
final PythonBridge rawPythonBridge =
createPythonBridge(nifiProperties, controllerServiceProvider);
final ClassLoader pythonBridgeClassLoader =
rawPythonBridge.getClass().getClassLoader();
@@ -901,7 +906,8 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
}
private static ConnectorRepository createConnectorRepository(final
NiFiProperties properties, final ExtensionDiscoveringManager extensionManager,
final FlowManager flowManager,
- final AssetManager assetManager, final SecretsManager
secretsManager, final NodeTypeProvider nodeTypeProvider, final
ConnectorRequestReplicator requestReplicator) {
+ final AssetManager assetManager, final SecretsManager
secretsManager, final NodeTypeProvider nodeTypeProvider, final
ConnectorRequestReplicator requestReplicator,
+ final ConnectorConfigurationProvider
connectorConfigurationProvider) {
final String implementationClassName =
properties.getProperty(NiFiProperties.CONNECTOR_REPOSITORY_IMPLEMENTATION,
DEFAULT_CONNECTOR_REPOSITORY_IMPLEMENTATION);
@@ -909,7 +915,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
// Discover implementations of Connector Repository. This is not
done at startup because the ConnectorRepository class is not
// provided in the list of standard extension points. This is due
to the fact that ConnectorRepository lives in the nifi-framework-core-api, and
// does not make sense to refactor it into some other module due
to its dependencies, simply to allow it to be discovered at startup.
- final Set<Class<?>> additionalExtensionTypes =
Set.of(ConnectorRepository.class, SecretsManager.class);
+ final Set<Class<?>> additionalExtensionTypes =
Set.of(ConnectorRepository.class, SecretsManager.class,
ConnectorConfigurationProvider.class);
extensionManager.discoverExtensions(extensionManager.getAllBundles(),
additionalExtensionTypes, true);
final ConnectorRepository created =
NarThreadContextClassLoader.createInstance(extensionManager,
implementationClassName, ConnectorRepository.class, properties);
@@ -919,7 +925,8 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
secretsManager,
assetManager,
nodeTypeProvider,
- requestReplicator
+ requestReplicator,
+ connectorConfigurationProvider
);
synchronized (created) {
@@ -964,6 +971,42 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
}
}
+ private static ConnectorConfigurationProvider
createConnectorConfigurationProvider(final NiFiProperties properties, final
ExtensionDiscoveringManager extensionManager) {
+ final String implementationClassName =
properties.getProperty(NiFiProperties.CONNECTOR_CONFIGURATION_PROVIDER_IMPLEMENTATION);
+ if (implementationClassName == null ||
implementationClassName.isBlank()) {
+ LOG.info("No Connector Configuration Provider implementation
configured; external connector configuration management is disabled");
+ return null;
+ }
+
+ try {
+
extensionManager.discoverExtensions(extensionManager.getAllBundles(),
Set.of(ConnectorConfigurationProvider.class), true);
+ final ConnectorConfigurationProvider created =
NarThreadContextClassLoader.createInstance(
+ extensionManager, implementationClassName,
ConnectorConfigurationProvider.class, properties);
+
+ final Map<String, String> initializationProperties =
properties.getPropertiesWithPrefix(NiFiProperties.CONNECTOR_CONFIGURATION_PROVIDER_PROPERTIES_PREFIX)
+ .entrySet().stream()
+ .collect(Collectors.toMap(
+ entry ->
entry.getKey().substring(NiFiProperties.CONNECTOR_CONFIGURATION_PROVIDER_PROPERTIES_PREFIX.length()),
+ Map.Entry::getValue
+ ));
+
+ final ConnectorConfigurationProviderInitializationContext
initializationContext =
+ new
StandardConnectorConfigurationProviderInitializationContext(initializationProperties);
+
+ synchronized (created) {
+ try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager, created.getClass(),
"connector-configuration-provider")) {
+ created.initialize(initializationContext);
+ }
+ }
+
+ LOG.info("Created Connector Configuration Provider of type {}",
created.getClass().getSimpleName());
+
+ return created;
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to create Connector
Configuration Provider", e);
+ }
+ }
+
public ConnectorRepository getConnectorRepository() {
return connectorRepository;
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 1b8825971c..46269467b1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -1161,7 +1161,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
private void updateConnector(final VersionedConnector versionedConnector,
final ConnectorRepository connectorRepository) {
final ConnectorNode connectorNode =
connectorRepository.getConnector(versionedConnector.getInstanceIdentifier());
- connectorNode.setName(versionedConnector.getName());
+ connectorRepository.updateConnector(connectorNode,
versionedConnector.getName());
// TODO: We don't want to throw an Exception here. Consider handling
Connectors first so that we can get all Connectors in a state of
// prepareForUpdate. If any fails, we can restore them and throw an
Exception. We don't want to be throwing an Exception in the middle
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
index b552447eae..2e209a722c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
@@ -19,20 +19,34 @@ package org.apache.nifi.components.connector;
import org.apache.nifi.asset.Asset;
import org.apache.nifi.asset.AssetManager;
+import org.apache.nifi.flow.VersionedConfigurationStep;
+import org.apache.nifi.flow.VersionedConnectorValueReference;
import org.apache.nifi.nar.ExtensionManager;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
public class TestStandardConnectorRepository {
@@ -191,4 +205,393 @@ public class TestStandardConnectorRepository {
verify(assetManager, never()).deleteAsset(workingAssetId);
verify(assetManager).deleteAsset(unreferencedAssetId);
}
+
+ @Test
+ public void testGetConnectorWithProviderOverridesWorkingConfig() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final MutableConnectorConfigurationContext workingConfigContext =
mock(MutableConnectorConfigurationContext.class);
+ final ConnectorNode connector =
createConnectorNodeWithWorkingConfig("connector-1", "Original Name",
workingConfigContext);
+ repository.addConnector(connector);
+
+ final ConnectorWorkingConfiguration externalConfig = new
ConnectorWorkingConfiguration();
+ externalConfig.setName("External Name");
+ final VersionedConfigurationStep externalStep =
createVersionedStep("step1", Map.of("prop1",
createStringLiteralRef("external-value")));
+ externalConfig.setWorkingFlowConfiguration(List.of(externalStep));
+
when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig));
+
+ final ConnectorNode result = repository.getConnector("connector-1");
+
+ assertNotNull(result);
+ verify(connector).setName("External Name");
+ verify(workingConfigContext).replaceProperties(eq("step1"),
any(StepConfiguration.class));
+ }
+
+ @Test
+ public void testGetConnectorWithProviderReturnsEmpty() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createSimpleConnectorNode("connector-1", "Original Name");
+ repository.addConnector(connector);
+
+ when(provider.load("connector-1")).thenReturn(Optional.empty());
+
+ final ConnectorNode result = repository.getConnector("connector-1");
+
+ assertNotNull(result);
+ verify(connector, never()).setName(anyString());
+ }
+
+ @Test
+ public void testGetConnectorWithProviderThrowsException() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createSimpleConnectorNode("connector-1", "Original Name");
+ repository.addConnector(connector);
+
+ when(provider.load("connector-1")).thenThrow(new
ConnectorConfigurationProviderException("Provider failure"));
+
+ assertThrows(ConnectorConfigurationProviderException.class, () ->
repository.getConnector("connector-1"));
+ verify(connector, never()).setName(anyString());
+ }
+
+ @Test
+ public void testGetConnectorWithNullProvider() {
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(null);
+
+ final ConnectorNode connector =
createSimpleConnectorNode("connector-1", "Original Name");
+ repository.addConnector(connector);
+
+ final ConnectorNode result = repository.getConnector("connector-1");
+
+ assertNotNull(result);
+ verify(connector, never()).setName(anyString());
+ }
+
+ @Test
+ public void testGetConnectorsWithProviderOverrides() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final MutableConnectorConfigurationContext workingConfig1 =
mock(MutableConnectorConfigurationContext.class);
+ final MutableConnectorConfigurationContext workingConfig2 =
mock(MutableConnectorConfigurationContext.class);
+ final ConnectorNode connector1 =
createConnectorNodeWithWorkingConfig("connector-1", "Name 1", workingConfig1);
+ final ConnectorNode connector2 =
createConnectorNodeWithWorkingConfig("connector-2", "Name 2", workingConfig2);
+ repository.addConnector(connector1);
+ repository.addConnector(connector2);
+
+ final ConnectorWorkingConfiguration externalConfig1 = new
ConnectorWorkingConfiguration();
+ externalConfig1.setName("External Name 1");
+ externalConfig1.setWorkingFlowConfiguration(List.of());
+
when(provider.load("connector-1")).thenReturn(Optional.of(externalConfig1));
+ when(provider.load("connector-2")).thenReturn(Optional.empty());
+
+ final List<ConnectorNode> results = repository.getConnectors();
+
+ assertEquals(2, results.size());
+ verify(connector1).setName("External Name 1");
+ verify(connector2, never()).setName(anyString());
+ }
+
+ @Test
+ public void testConfigureConnectorSavesToProviderBeforeModifyingNode()
throws FlowUpdateException {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createConnectorNodeWithEmptyWorkingConfig("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ when(provider.load("connector-1")).thenReturn(Optional.empty());
+
+ final StepConfiguration incomingConfig = new
StepConfiguration(Map.of("prop1", new StringLiteralValue("new-value")));
+ repository.configureConnector(connector, "step1", incomingConfig);
+
+ final ArgumentCaptor<ConnectorWorkingConfiguration> configCaptor =
ArgumentCaptor.forClass(ConnectorWorkingConfiguration.class);
+ verify(provider).save(eq("connector-1"), configCaptor.capture());
+
+ final ConnectorWorkingConfiguration savedConfig =
configCaptor.getValue();
+ assertNotNull(savedConfig);
+ assertEquals("Test Connector", savedConfig.getName());
+
+ final List<VersionedConfigurationStep> savedSteps =
savedConfig.getWorkingFlowConfiguration();
+ assertNotNull(savedSteps);
+ final VersionedConfigurationStep savedStep = savedSteps.stream()
+ .filter(s -> "step1".equals(s.getName())).findFirst().orElse(null);
+ assertNotNull(savedStep);
+ assertEquals("STRING_LITERAL",
savedStep.getProperties().get("prop1").getValueType());
+ assertEquals("new-value",
savedStep.getProperties().get("prop1").getValue());
+
+ verify(connector).setConfiguration("step1", incomingConfig);
+ }
+
+ @Test
+ public void testConfigureConnectorProviderSaveFailsDoesNotModifyNode()
throws FlowUpdateException {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createConnectorNodeWithEmptyWorkingConfig("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ when(provider.load("connector-1")).thenReturn(Optional.empty());
+ doThrow(new RuntimeException("Save
failed")).when(provider).save(anyString(),
any(ConnectorWorkingConfiguration.class));
+
+ final StepConfiguration incomingConfig = new
StepConfiguration(Map.of("prop1", new StringLiteralValue("new-value")));
+
+ assertThrows(RuntimeException.class, () ->
repository.configureConnector(connector, "step1", incomingConfig));
+
+ verify(connector, never()).setConfiguration(anyString(),
any(StepConfiguration.class));
+ }
+
+ @Test
+ public void testConfigureConnectorMergesPartialStepConfig() throws
FlowUpdateException {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createConnectorNodeWithEmptyWorkingConfig("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ final VersionedConfigurationStep existingStep =
createVersionedStep("step1",
+ Map.of("propA", createStringLiteralRef("old-A"), "propB",
createStringLiteralRef("old-B")));
+ final ConnectorWorkingConfiguration existingConfig = new
ConnectorWorkingConfiguration();
+ existingConfig.setName("Test Connector");
+ existingConfig.setWorkingFlowConfiguration(new
ArrayList<>(List.of(existingStep)));
+
when(provider.load("connector-1")).thenReturn(Optional.of(existingConfig));
+
+ final Map<String, ConnectorValueReference> incomingProps = new
HashMap<>();
+ incomingProps.put("propA", new StringLiteralValue("new-A"));
+ incomingProps.put("propC", new StringLiteralValue("new-C"));
+ final StepConfiguration incomingConfig = new
StepConfiguration(incomingProps);
+
+ repository.configureConnector(connector, "step1", incomingConfig);
+
+ final ArgumentCaptor<ConnectorWorkingConfiguration> configCaptor =
ArgumentCaptor.forClass(ConnectorWorkingConfiguration.class);
+ verify(provider).save(eq("connector-1"), configCaptor.capture());
+
+ final ConnectorWorkingConfiguration savedConfig =
configCaptor.getValue();
+ final VersionedConfigurationStep savedStep =
savedConfig.getWorkingFlowConfiguration().stream()
+ .filter(s -> "step1".equals(s.getName())).findFirst().orElse(null);
+ assertNotNull(savedStep);
+
+ final Map<String, VersionedConnectorValueReference> savedProps =
savedStep.getProperties();
+ assertEquals("new-A", savedProps.get("propA").getValue());
+ assertEquals("old-B", savedProps.get("propB").getValue());
+ assertEquals("new-C", savedProps.get("propC").getValue());
+
+ verify(connector).setConfiguration("step1", incomingConfig);
+ }
+
+ @Test
+ public void testDiscardWorkingConfigurationCallsProviderDiscard() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createConnectorNodeWithEmptyWorkingConfig("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ repository.discardWorkingConfiguration(connector);
+
+ verify(provider).discard("connector-1");
+ verify(provider, never()).save(anyString(),
any(ConnectorWorkingConfiguration.class));
+ verify(connector).discardWorkingConfiguration();
+ }
+
+ @Test
+ public void testDiscardWorkingConfigurationProviderThrowsException() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createConnectorNodeWithEmptyWorkingConfig("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ doThrow(new ConnectorConfigurationProviderException("Discard
failed")).when(provider).discard("connector-1");
+
+ assertThrows(ConnectorConfigurationProviderException.class, () ->
repository.discardWorkingConfiguration(connector));
+ }
+
+ @Test
+ public void testRemoveConnectorCallsProviderDelete() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final Connector mockConnector = mock(Connector.class);
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn("connector-1");
+ when(connector.getConnector()).thenReturn(mockConnector);
+ repository.addConnector(connector);
+
+ repository.removeConnector("connector-1");
+
+ verify(provider).delete("connector-1");
+ }
+
+ @Test
+ public void testRemoveConnectorProviderThrowsException() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final Connector mockConnector = mock(Connector.class);
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn("connector-1");
+ when(connector.getConnector()).thenReturn(mockConnector);
+ repository.addConnector(connector);
+
+ doThrow(new ConnectorConfigurationProviderException("Delete
failed")).when(provider).delete("connector-1");
+
+ assertThrows(ConnectorConfigurationProviderException.class, () ->
repository.removeConnector("connector-1"));
+ }
+
+ @Test
+ public void testUpdateConnectorSavesNameToProvider() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ final ConnectorNode connector =
createConnectorNodeWithEmptyWorkingConfig("connector-1", "Old Name");
+ repository.addConnector(connector);
+
+ repository.updateConnector(connector, "New Name");
+
+ final ArgumentCaptor<ConnectorWorkingConfiguration> configCaptor =
ArgumentCaptor.forClass(ConnectorWorkingConfiguration.class);
+ verify(provider).save(eq("connector-1"), configCaptor.capture());
+ assertEquals("New Name", configCaptor.getValue().getName());
+
+ verify(connector).setName("New Name");
+ }
+
+ @Test
+ public void testInheritConfigurationDoesNotCallProvider() throws
FlowUpdateException {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ when(provider.load("connector-1")).thenReturn(Optional.empty());
+
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn("connector-1");
+ repository.addConnector(connector);
+
+ // Reset interactions so we can verify that inheritConfiguration
itself does not call the provider
+ reset(provider);
+
+ repository.inheritConfiguration(connector, List.of(), List.of(), null);
+
+ verifyNoInteractions(provider);
+ }
+
+ @Test
+ public void testVerifyCreateWithExistingConnectorThrows() {
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(null);
+
+ final ConnectorNode connector =
createSimpleConnectorNode("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ assertThrows(IllegalStateException.class, () ->
repository.verifyCreate("connector-1"));
+ }
+
+ @Test
+ public void testVerifyCreateWithNewConnectorSucceeds() {
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(null);
+
+ repository.verifyCreate("connector-1");
+ }
+
+ @Test
+ public void testVerifyCreateDelegatesToProvider() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ repository.verifyCreate("connector-1");
+
+ verify(provider).verifyCreate("connector-1");
+ }
+
+ @Test
+ public void testVerifyCreateProviderRejectsThrows() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ doThrow(new ConnectorConfigurationProviderException("Create not
supported")).when(provider).verifyCreate("connector-1");
+
+ assertThrows(ConnectorConfigurationProviderException.class, () ->
repository.verifyCreate("connector-1"));
+ }
+
+ @Test
+ public void testVerifyCreateExistingConnectorDoesNotCallProvider() {
+ final ConnectorConfigurationProvider provider =
mock(ConnectorConfigurationProvider.class);
+ final StandardConnectorRepository repository =
createRepositoryWithProvider(provider);
+
+ when(provider.load("connector-1")).thenReturn(Optional.empty());
+
+ final ConnectorNode connector =
createSimpleConnectorNode("connector-1", "Test Connector");
+ repository.addConnector(connector);
+
+ assertThrows(IllegalStateException.class, () ->
repository.verifyCreate("connector-1"));
+ verify(provider, never()).verifyCreate(anyString());
+ }
+
+ private StandardConnectorRepository createRepositoryWithProvider(final
ConnectorConfigurationProvider provider) {
+ final StandardConnectorRepository repository = new
StandardConnectorRepository();
+ final ConnectorRepositoryInitializationContext initContext =
mock(ConnectorRepositoryInitializationContext.class);
+
when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class));
+
when(initContext.getAssetManager()).thenReturn(mock(AssetManager.class));
+
when(initContext.getConnectorConfigurationProvider()).thenReturn(provider);
+ repository.initialize(initContext);
+ return repository;
+ }
+
+ private ConnectorNode createSimpleConnectorNode(final String id, final
String name) {
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn(id);
+ when(connector.getName()).thenReturn(name);
+ return connector;
+ }
+
+ private ConnectorNode createConnectorNodeWithWorkingConfig(final String
id, final String name, final MutableConnectorConfigurationContext
workingConfigContext) {
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn(id);
+ when(connector.getName()).thenReturn(name);
+
+ final FrameworkFlowContext workingFlowContext =
mock(FrameworkFlowContext.class);
+
when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext);
+ when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext);
+
+ return connector;
+ }
+
+ private ConnectorNode createConnectorNodeWithEmptyWorkingConfig(final
String id, final String name) {
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn(id);
+ when(connector.getName()).thenReturn(name);
+
+ final MutableConnectorConfigurationContext workingConfigContext =
mock(MutableConnectorConfigurationContext.class);
+ final ConnectorConfiguration emptyConfig = new
ConnectorConfiguration(Set.of());
+
when(workingConfigContext.toConnectorConfiguration()).thenReturn(emptyConfig);
+
+ final FrameworkFlowContext workingFlowContext =
mock(FrameworkFlowContext.class);
+
when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext);
+ when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext);
+
+ final FrameworkFlowContext activeFlowContext =
mock(FrameworkFlowContext.class);
+ final MutableConnectorConfigurationContext activeConfigContext =
mock(MutableConnectorConfigurationContext.class);
+
when(activeConfigContext.toConnectorConfiguration()).thenReturn(emptyConfig);
+
when(activeFlowContext.getConfigurationContext()).thenReturn(activeConfigContext);
+ when(connector.getActiveFlowContext()).thenReturn(activeFlowContext);
+
+ return connector;
+ }
+
+ private VersionedConfigurationStep createVersionedStep(final String name,
final Map<String, VersionedConnectorValueReference> properties) {
+ final VersionedConfigurationStep step = new
VersionedConfigurationStep();
+ step.setName(name);
+ step.setProperties(new HashMap<>(properties));
+ return step;
+ }
+
+ private VersionedConnectorValueReference createStringLiteralRef(final
String value) {
+ final VersionedConnectorValueReference ref = new
VersionedConnectorValueReference();
+ ref.setValueType("STRING_LITERAL");
+ ref.setValue(value);
+ return ref;
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index bc713549f1..09b763852a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -140,6 +140,9 @@
nifi.asset.manager.properties.directory=${nifi.asset.manager.properties.director
nifi.connector.asset.manager.implementation=${nifi.connector.asset.manager.implementation}
nifi.connector.asset.manager.properties.directory=${nifi.connector.asset.manager.properties.directory}
+# Connector Configuration Provider
+nifi.components.connectors.configuration.provider.implementation=
+
# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=${nifi.remote.input.secure}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index f686080454..8f1ba96497 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -3601,13 +3601,11 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
final RevisionClaim claim = new StandardRevisionClaim(revision);
final RevisionUpdate<ConnectorDTO> snapshot =
revisionManager.updateRevision(claim, user, () -> {
- final ConnectorNode node =
connectorDAO.getConnector(connectorDTO.getId());
- if (connectorDTO.getName() != null) {
- node.setName(connectorDTO.getName());
- }
+ connectorDAO.updateConnector(connectorDTO);
controllerFacade.save();
+ final ConnectorNode node =
connectorDAO.getConnector(connectorDTO.getId());
final ConnectorDTO dto = dtoFactory.createConnectorDto(node);
final FlowModification lastMod = new
FlowModification(revision.incrementRevision(revision.getClientId()),
user.getIdentity());
return new StandardRevisionUpdate<>(dto, lastMod);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
index cb2e7046ce..70801f1dcd 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
@@ -42,6 +42,8 @@ public interface ConnectorDAO {
ConnectorNode createConnector(String type, String id, BundleCoordinate
bundleCoordinate, boolean firstTimeAdded, boolean registerLogObserver);
+ void updateConnector(ConnectorDTO connectorDTO);
+
void deleteConnector(String id);
void startConnector(String id);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index c9fd333180..cb7680cb0b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -83,8 +83,8 @@ public class StandardConnectorDAO implements ConnectorDAO {
@Override
public void verifyCreate(final ConnectorDTO connectorDTO) {
final String id = connectorDTO.getId();
- if (id != null && hasConnector(id)) {
- throw new IllegalStateException("A Connector already exists with
ID %s".formatted(id));
+ if (id != null) {
+ getConnectorRepository().verifyCreate(id);
}
}
@@ -111,10 +111,17 @@ public class StandardConnectorDAO implements ConnectorDAO
{
public ConnectorNode createConnector(final String type, final String id,
final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded, final
boolean registerLogObserver) {
final FlowManager flowManager = getFlowManager();
final ConnectorNode connector = flowManager.createConnector(type, id,
bundleCoordinate, firstTimeAdded, registerLogObserver);
- getConnectorRepository().addConnector(connector);
return connector;
}
+ @Override
+ public void updateConnector(final ConnectorDTO connectorDTO) {
+ final ConnectorNode connector = getConnector(connectorDTO.getId());
+ if (connectorDTO.getName() != null) {
+ getConnectorRepository().updateConnector(connector,
connectorDTO.getName());
+ }
+ }
+
@Override
public void deleteConnector(final String id) {
getConnectorRepository().removeConnector(id);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java
index 5cbf1ef589..e642add1a8 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectorDAOTest.java
@@ -273,30 +273,28 @@ class StandardConnectorDAOTest {
}
@Test
- void testVerifyCreateWithExistingConnectorId() {
+ void testVerifyCreateDelegatesToRepository() {
final ConnectorDTO connectorDTO = new ConnectorDTO();
connectorDTO.setId(CONNECTOR_ID);
- connectorDTO.setType("org.apache.nifi.connector.TestConnector");
-
when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(connectorNode);
-
- final IllegalStateException exception =
assertThrows(IllegalStateException.class, () ->
- connectorDAO.verifyCreate(connectorDTO)
- );
+ connectorDAO.verifyCreate(connectorDTO);
- assertEquals("A Connector already exists with ID
%s".formatted(CONNECTOR_ID), exception.getMessage());
+ verify(connectorRepository).verifyCreate(CONNECTOR_ID);
}
@Test
- void testVerifyCreateWithNewId() {
+ void testVerifyCreateWithRepositoryThrowingException() {
final ConnectorDTO connectorDTO = new ConnectorDTO();
connectorDTO.setId(CONNECTOR_ID);
- when(connectorRepository.getConnector(CONNECTOR_ID)).thenReturn(null);
+ doThrow(new IllegalStateException("A Connector already exists with ID
%s".formatted(CONNECTOR_ID)))
+ .when(connectorRepository).verifyCreate(CONNECTOR_ID);
- connectorDAO.verifyCreate(connectorDTO);
+ final IllegalStateException exception =
assertThrows(IllegalStateException.class, () ->
+ connectorDAO.verifyCreate(connectorDTO)
+ );
- verify(connectorRepository).getConnector(CONNECTOR_ID);
+ assertEquals("A Connector already exists with ID
%s".formatted(CONNECTOR_ID), exception.getMessage());
}
@Test
@@ -306,7 +304,7 @@ class StandardConnectorDAOTest {
connectorDAO.verifyCreate(connectorDTO);
- verify(connectorRepository, never()).getConnector(any());
+ verify(connectorRepository, never()).verifyCreate(any());
}
}