This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new d386883011 NIFI-15495 Restart Connectors that reference assets that
were synchro… (#10806)
d386883011 is described below
commit d386883011d00f9e8a8d7dfe6ca10d7df7556f14
Author: Bryan Bende <[email protected]>
AuthorDate: Thu Jan 22 14:48:23 2026 -0500
NIFI-15495 Restart Connectors that reference assets that were synchro…
(#10806)
* NIFI-15495 Restart Connectors that reference assets that were synchronized
- Ensure Connectors re-resolve property values before starting
- Ensure asset clean up happens only after applyUpdate fully finishes
- Add connector asset properties to default nifi.properties
* Fix system test
* Encapsulate restart logic in new method on ConnectorNode
* Fix JavaDoc
---
.../server/MockConnectorAssetManager.java | 2 +-
.../nifi/asset/StandardConnectorAssetManager.java | 2 +-
.../components/connector/ConnectorRepository.java | 10 +++
.../MutableConnectorConfigurationContext.java | 5 ++
.../nifi-framework/nifi-framework-core/pom.xml | 3 +
.../asset/StandardConnectorAssetSynchronizer.java | 55 ++++++++++++++--
.../StandardConnectorConfigurationContext.java | 22 +++++++
.../connector/StandardConnectorNode.java | 7 +-
.../connector/StandardConnectorRepository.java | 77 +++++++++++++++++++++-
.../nifi-framework/nifi-resources/pom.xml | 2 +
.../src/main/resources/conf/nifi.properties | 2 +
.../nifi/web/dao/impl/StandardConnectorDAO.java | 43 +-----------
.../apache/nifi/tests/system/NiFiClientUtil.java | 8 +++
.../org/apache/nifi/tests/system/NiFiSystemIT.java | 1 +
.../connectors/ClusteredConnectorAssetsIT.java | 32 +++++++--
.../tests/system/connectors/ConnectorAssetsIT.java | 3 +
16 files changed, 214 insertions(+), 60 deletions(-)
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
index e8272be173..92892f6275 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
@@ -41,7 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class MockConnectorAssetManager implements AssetManager {
private static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory";
- private static final String DEFAULT_ASSET_STORAGE_LOCATION =
"target/mock-connector-assets";
+ private static final String DEFAULT_ASSET_STORAGE_LOCATION =
"target/mock_connector_assets";
private final Map<String, Asset> assets = new ConcurrentHashMap<>();
private volatile File assetStorageLocation;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
index 7cc907de11..87b01a47b1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
@@ -46,7 +46,7 @@ public class StandardConnectorAssetManager implements
AssetManager {
private static final Logger logger =
LoggerFactory.getLogger(StandardConnectorAssetManager.class);
public static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory";
- public static final String DEFAULT_ASSET_STORAGE_LOCATION =
"./connector-assets";
+ public static final String DEFAULT_ASSET_STORAGE_LOCATION =
"./connector_assets";
private volatile File assetStorageLocation;
private final Map<String, Asset> assets = new ConcurrentHashMap<>();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
index 2a134d8e5e..59b0bf03af 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
@@ -76,6 +76,14 @@ public interface ConnectorRepository {
*/
Future<Void> stopConnector(ConnectorNode connector);
+ /**
+ * Restarts the given Connector, managing any appropriate lifecycle events.
+ *
+ * @param connector the Connector to restart
+ * @return a CompletableFuture that will be completed when the Connector
has restarted
+ */
+ Future<Void> restartConnector(ConnectorNode connector);
+
void configureConnector(ConnectorNode connector, String stepName,
StepConfiguration configuration) throws FlowUpdateException;
void applyUpdate(ConnectorNode connector, ConnectorUpdateContext context)
throws FlowUpdateException;
@@ -83,6 +91,8 @@ public interface ConnectorRepository {
void inheritConfiguration(ConnectorNode connector,
List<VersionedConfigurationStep> activeFlowConfiguration,
List<VersionedConfigurationStep> workingFlowConfiguration, Bundle
flowContextBundle) throws FlowUpdateException;
+ void discardWorkingConfiguration(ConnectorNode connector);
+
SecretsManager getSecretsManager();
/**
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
index aaf4c1fdf3..22191633af 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
@@ -41,6 +41,11 @@ public interface MutableConnectorConfigurationContext
extends ConnectorConfigura
*/
ConfigurationUpdateResult replaceProperties(String stepName,
StepConfiguration configuration);
+ /**
+ * Resolves all existing property values.
+ */
+ void resolvePropertyValues();
+
/**
* Converts this mutable configuration context to an immutable
ConnectorConfiguration.
* @return the ConnectorConfiguration
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 479567966e..0ae05a9969 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -324,6 +324,9 @@
<exclude>src/test/resources/old-swap-file.swap</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude>
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
+ <exclude>src/test/resources/colors.txt</exclude>
+
<exclude>src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md</exclude>
+
<exclude>src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
index 2eb1aa5a9a..ad70446ce5 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
@@ -22,6 +22,8 @@ import org.apache.nifi.client.NiFiRestApiRetryableException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.util.NiFiProperties;
@@ -37,8 +39,12 @@ import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -55,6 +61,7 @@ public class StandardConnectorAssetSynchronizer implements
AssetSynchronizer {
private final AssetManager assetManager;
private final FlowManager flowManager;
+ private final ConnectorRepository connectorRepository;
private final ClusterCoordinator clusterCoordinator;
private final WebClientService webClientService;
private final NiFiProperties properties;
@@ -65,6 +72,7 @@ public class StandardConnectorAssetSynchronizer implements
AssetSynchronizer {
final NiFiProperties properties)
{
this.assetManager = flowController.getConnectorAssetManager();
this.flowManager = flowController.getFlowManager();
+ this.connectorRepository = flowController.getConnectorRepository();
this.clusterCoordinator = clusterCoordinator;
this.webClientService = webClientService;
this.properties = properties;
@@ -96,16 +104,43 @@ public class StandardConnectorAssetSynchronizer implements
AssetSynchronizer {
final List<ConnectorNode> connectors = flowManager.getAllConnectors();
logger.info("Found {} connectors for synchronizing assets",
connectors.size());
+ final Set<ConnectorNode> connectorsWithSynchronizedAssets = new
HashSet<>();
for (final ConnectorNode connector : connectors) {
try {
- synchronize(assetsRestApiClient, connector);
+ final boolean assetSynchronized =
synchronize(assetsRestApiClient, connector);
+ if (assetSynchronized) {
+ connectorsWithSynchronizedAssets.add(connector);
+ }
} catch (final Exception e) {
logger.error("Failed to synchronize assets for connector
[{}]", connector.getIdentifier(), e);
}
}
+
+
restartConnectorsWithSynchronizedAssets(connectorsWithSynchronizedAssets);
+ }
+
+ private void restartConnectorsWithSynchronizedAssets(final
Set<ConnectorNode> connectorsWithSynchronizedAssets) {
+ for (final ConnectorNode connector : connectorsWithSynchronizedAssets)
{
+ final ConnectorState currentState = connector.getDesiredState();
+ if (currentState == ConnectorState.RUNNING) {
+ logger.info("Restarting connector [{}] after asset
synchronization", connector.getIdentifier());
+ try {
+ final Future<Void> restartFuture =
connectorRepository.restartConnector(connector);
+ restartFuture.get();
+ logger.info("Successfully restarted connector [{}] after
asset synchronization", connector.getIdentifier());
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted while restarting connector [{}]
after asset synchronization", connector.getIdentifier(), e);
+ } catch (final ExecutionException e) {
+ logger.error("Failed to restart connector [{}] after asset
synchronization", connector.getIdentifier(), e.getCause());
+ }
+ } else {
+ logger.info("Connector [{}] is not running (state={}):
skipping restart after asset synchronization", connector.getIdentifier(),
currentState);
+ }
+ }
}
- private void synchronize(final AssetsRestApiClient assetsRestApiClient,
final ConnectorNode connector) {
+ private boolean synchronize(final AssetsRestApiClient assetsRestApiClient,
final ConnectorNode connector) {
final String connectorId = connector.getIdentifier();
final Map<String, Asset> existingAssets =
assetManager.getAssets(connectorId).stream()
.collect(Collectors.toMap(Asset::getIdentifier,
Function.identity()));
@@ -113,35 +148,41 @@ public class StandardConnectorAssetSynchronizer
implements AssetSynchronizer {
final AssetsEntity coordinatorAssetsEntity =
listConnectorAssetsWithRetry(assetsRestApiClient, connectorId);
if (coordinatorAssetsEntity == null) {
logger.error("Timeout listing assets from cluster coordinator for
connector [{}]", connectorId);
- return;
+ return false;
}
final Collection<AssetEntity> coordinatorAssets =
coordinatorAssetsEntity.getAssets();
if (coordinatorAssets == null || coordinatorAssets.isEmpty()) {
logger.info("Connector [{}] did not return any assets from the
cluster coordinator", connectorId);
- return;
+ return false;
}
logger.info("Connector [{}] returned {} assets from the cluster
coordinator", connectorId, coordinatorAssets.size());
+ boolean assetSynchronized = false;
for (final AssetEntity coordinatorAssetEntity : coordinatorAssets) {
final AssetDTO coordinatorAsset =
coordinatorAssetEntity.getAsset();
final Asset matchingAsset =
existingAssets.get(coordinatorAsset.getId());
try {
- synchronize(assetsRestApiClient, connectorId,
coordinatorAsset, matchingAsset);
+ final boolean assetWasSynchronized =
synchronize(assetsRestApiClient, connectorId, coordinatorAsset, matchingAsset);
+ if (assetWasSynchronized) {
+ assetSynchronized = true;
+ }
} catch (final Exception e) {
logger.error("Failed to synchronize asset [id={},name={}] for
connector [{}]",
coordinatorAsset.getId(), coordinatorAsset.getName(),
connectorId, e);
}
}
+ return assetSynchronized;
}
- private void synchronize(final AssetsRestApiClient assetsRestApiClient,
final String connectorId, final AssetDTO coordinatorAsset, final Asset
matchingAsset) {
+ private boolean synchronize(final AssetsRestApiClient assetsRestApiClient,
final String connectorId, final AssetDTO coordinatorAsset, final Asset
matchingAsset) {
final String assetId = coordinatorAsset.getId();
final String assetName = coordinatorAsset.getName();
if (matchingAsset == null || !matchingAsset.getFile().exists()) {
logger.info("Synchronizing missing asset [id={},name={}] for
connector [{}]", assetId, assetName, connectorId);
synchronizeConnectorAssetWithRetry(assetsRestApiClient,
connectorId, coordinatorAsset);
+ return true;
} else {
final String coordinatorAssetDigest = coordinatorAsset.getDigest();
final String matchingAssetDigest =
matchingAsset.getDigest().orElse(null);
@@ -149,8 +190,10 @@ public class StandardConnectorAssetSynchronizer implements
AssetSynchronizer {
logger.info("Synchronizing asset [id={},name={}] with updated
digest [{}] for connector [{}]",
assetId, assetName, coordinatorAssetDigest,
connectorId);
synchronizeConnectorAssetWithRetry(assetsRestApiClient,
connectorId, coordinatorAsset);
+ return true;
} else {
logger.info("Coordinator asset [id={},name={}] found for
connector [{}]: retrieval not required", assetId, assetName, connectorId);
+ return false;
}
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
index bc7c2b9d68..c37a246933 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
@@ -21,6 +21,8 @@ import org.apache.nifi.asset.Asset;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.connector.secrets.SecretProvider;
import org.apache.nifi.components.connector.secrets.SecretsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -37,6 +39,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StandardConnectorConfigurationContext implements
MutableConnectorConfigurationContext, Cloneable {
+ private static final Logger logger =
LoggerFactory.getLogger(StandardConnectorConfigurationContext.class);
+
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
@@ -205,6 +209,7 @@ public class StandardConnectorConfigurationContext
implements MutableConnectorCo
.ifPresent(resolvedAssetValues::add);
}
+ logger.debug("Resolved {} to {}", assetReference, resolvedAssetValues);
return new StringLiteralValue(String.join(",", resolvedAssetValues));
}
@@ -265,6 +270,23 @@ public class StandardConnectorConfigurationContext
implements MutableConnectorCo
}
}
+ @Override
+ public void resolvePropertyValues() {
+ writeLock.lock();
+ try {
+ for (final Map.Entry<String, StepConfiguration> entry :
propertyConfigurations.entrySet()) {
+ final String stepName = entry.getKey();
+ final StepConfiguration stepConfig = entry.getValue();
+ final Map<String, ConnectorValueReference> stepProperties =
stepConfig.getPropertyValues();
+
+ final StepConfiguration resolvedConfig =
resolvePropertyValues(stepProperties);
+ this.resolvedPropertyConfigurations.put(stepName,
resolvedConfig);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
@Override
public ConnectorConfiguration toConnectorConfiguration() {
readLock.lock();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 4e2213eb0a..a2501c883b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -373,11 +373,12 @@ public class StandardConnectorNode implements
ConnectorNode {
private void start(final FlowEngine scheduler, final
CompletableFuture<Void> startCompleteFuture) {
try {
+ stateTransition.setDesiredState(ConnectorState.RUNNING);
+
activeFlowContext.getConfigurationContext().resolvePropertyValues();
+
verifyCanStart();
- stateTransition.setDesiredState(ConnectorState.RUNNING);
final ConnectorState currentState = getCurrentState();
-
switch (currentState) {
case STARTING -> {
logger.debug("{} is already starting; adding future to
pending start futures", this);
@@ -862,6 +863,8 @@ public class StandardConnectorNode implements ConnectorNode
{
.map(File::getAbsolutePath)
.ifPresent(resolvedAssetValues::add);
}
+
+ logger.debug("Resolved {} to {} for {}", assetReference,
resolvedAssetValues, this);
return String.join(",", resolvedAssetValues);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index b506fa3b87..b78d504112 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -18,6 +18,7 @@
package org.apache.nifi.components.connector;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.asset.Asset;
import org.apache.nifi.components.connector.secrets.SecretsManager;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.flow.Bundle;
@@ -30,10 +31,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class StandardConnectorRepository implements ConnectorRepository {
@@ -103,6 +107,33 @@ public class StandardConnectorRepository implements
ConnectorRepository {
return connector.stop(lifecycleExecutor);
}
+ @Override
+ public Future<Void> restartConnector(final ConnectorNode connector) {
+ final CompletableFuture<Void> restartCompleteFuture = new
CompletableFuture<>();
+ restartConnector(connector, restartCompleteFuture);
+ return restartCompleteFuture;
+ }
+
+ private void restartConnector(final ConnectorNode connector, final
CompletableFuture<Void> restartCompleteFuture) {
+ try {
+ final Future<Void> stopFuture = connector.stop(lifecycleExecutor);
+ stopFuture.get();
+
+ final Future<Void> startFuture =
connector.start(lifecycleExecutor);
+ startFuture.get();
+
+ logger.info("Successfully restarted connector [{}]",
connector.getIdentifier());
+ restartCompleteFuture.complete(null);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted while restarting connector [{}]",
connector.getIdentifier(), e);
+ restartCompleteFuture.completeExceptionally(e);
+ } catch (final ExecutionException e) {
+ logger.error("Failed to restart connector [{}]",
connector.getIdentifier(), e.getCause());
+ restartCompleteFuture.completeExceptionally(e);
+ }
+ }
+
@Override
public void applyUpdate(final ConnectorNode connector, final
ConnectorUpdateContext context) throws FlowUpdateException {
final ConnectorState initialDesiredState = connector.getDesiredState();
@@ -117,7 +148,10 @@ public class StandardConnectorRepository implements
ConnectorRepository {
// Update connector in a background thread. This will handle
transitioning the Connector state appropriately
// so that it's clear when the update has completed.
- lifecycleExecutor.submit(() -> updateConnector(connector,
initialDesiredState, context));
+ lifecycleExecutor.submit(() -> {
+ updateConnector(connector, initialDesiredState, context);
+ cleanUpAssets(connector);
+ });
}
private void updateConnector(final ConnectorNode connector, final
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
@@ -189,6 +223,41 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
}
+ private void cleanUpAssets(final ConnectorNode connector) {
+ final FrameworkFlowContext activeFlowContext =
connector.getActiveFlowContext();
+ final ConnectorConfiguration activeConfiguration =
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
+
+ final Set<String> referencedAssetIds = new HashSet<>();
+ for (final NamedStepConfiguration namedStepConfiguration :
activeConfiguration.getNamedStepConfigurations()) {
+ final StepConfiguration stepConfiguration =
namedStepConfiguration.configuration();
+ final Map<String, ConnectorValueReference> stepPropertyValues =
stepConfiguration.getPropertyValues();
+ if (stepPropertyValues == null) {
+ continue;
+ }
+ for (final ConnectorValueReference valueReference :
stepPropertyValues.values()) {
+ if (valueReference instanceof AssetReference assetReference) {
+
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
+ }
+ }
+ }
+
+ logger.debug("Found {} assets referenced for Connector [{}]",
referencedAssetIds.size(), connector.getIdentifier());
+
+ final ConnectorAssetRepository assetRepository = getAssetRepository();
+ final List<Asset> allConnectorAssets =
assetRepository.getAssets(connector.getIdentifier());
+ for (final Asset asset : allConnectorAssets) {
+ final String assetId = asset.getIdentifier();
+ if (!referencedAssetIds.contains(assetId)) {
+ try {
+ logger.info("Deleting unreferenced asset [id={},name={}]
for connector [{}]", assetId, asset.getName(), connector.getIdentifier());
+ assetRepository.deleteAsset(assetId);
+ } catch (final Exception e) {
+ logger.warn("Unable to delete unreferenced asset
[id={},name={}] for connector [{}]", assetId, asset.getName(),
connector.getIdentifier(), e);
+ }
+ }
+ }
+ }
+
@Override
public void configureConnector(final ConnectorNode connector, final String
stepName, final StepConfiguration configuration) throws FlowUpdateException {
connector.setConfiguration(stepName, configuration);
@@ -209,6 +278,12 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
}
+ @Override
+ public void discardWorkingConfiguration(final ConnectorNode connector) {
+ connector.discardWorkingConfiguration();
+ cleanUpAssets(connector);
+ }
+
@Override
public SecretsManager getSecretsManager() {
return secretsManager;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
b/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 77388898dc..2b10f4be3a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -121,6 +121,8 @@
<!-- Asset Management properties -->
<nifi.asset.manager.implementation>org.apache.nifi.asset.StandardAssetManager</nifi.asset.manager.implementation>
<nifi.asset.manager.properties.directory>./assets</nifi.asset.manager.properties.directory>
+
<nifi.connector.asset.manager.implementation>org.apache.nifi.asset.StandardConnectorAssetManager</nifi.connector.asset.manager.implementation>
+
<nifi.connector.asset.manager.properties.directory>./connector_assets</nifi.connector.asset.manager.properties.directory>
<!-- nifi.properties: web properties -->
<nifi.web.http.host />
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 1c5b89b6f7..bc713549f1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++
b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -137,6 +137,8 @@
nifi.nar.persistence.provider.properties.directory=${nifi.nar.persistence.provid
# Asset Management
nifi.asset.manager.implementation=${nifi.asset.manager.implementation}
nifi.asset.manager.properties.directory=${nifi.asset.manager.properties.directory}
+nifi.connector.asset.manager.implementation=${nifi.connector.asset.manager.implementation}
+nifi.connector.asset.manager.properties.directory=${nifi.connector.asset.manager.properties.directory}
# Site to Site properties
nifi.remote.input.host=
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index 19b58dd207..8d18a5dff9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -22,14 +22,11 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.connector.AssetReference;
import org.apache.nifi.components.connector.ConnectorAssetRepository;
-import org.apache.nifi.components.connector.ConnectorConfiguration;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorUpdateContext;
import org.apache.nifi.components.connector.ConnectorValueReference;
import org.apache.nifi.components.connector.ConnectorValueType;
-import org.apache.nifi.components.connector.FrameworkFlowContext;
-import org.apache.nifi.components.connector.NamedStepConfiguration;
import org.apache.nifi.components.connector.SecretReference;
import org.apache.nifi.components.connector.StepConfiguration;
import org.apache.nifi.components.connector.StringLiteralValue;
@@ -51,7 +48,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -198,7 +194,6 @@ public class StandardConnectorDAO implements ConnectorDAO {
final ConnectorNode connector = getConnector(id);
try {
getConnectorRepository().applyUpdate(connector, updateContext);
- cleanUpAssets(connector);
} catch (final Exception e) {
throw new NiFiCoreException("Failed to apply connector update: " +
e, e);
}
@@ -207,8 +202,7 @@ public class StandardConnectorDAO implements ConnectorDAO {
@Override
public void discardWorkingConfiguration(final String id) {
final ConnectorNode connector = getConnector(id);
- connector.discardWorkingConfiguration();
- cleanUpAssets(connector);
+ getConnectorRepository().discardWorkingConfiguration(connector);
}
@Override
@@ -256,41 +250,6 @@ public class StandardConnectorDAO implements ConnectorDAO {
final ConnectorAssetRepository assetRepository =
getConnectorAssetRepository();
return assetRepository.getAsset(assetId);
}
-
- private void cleanUpAssets(final ConnectorNode connector) {
- final FrameworkFlowContext activeFlowContext =
connector.getActiveFlowContext();
- final ConnectorConfiguration activeConfiguration =
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
-
- final Set<String> referencedAssetIds = new HashSet<>();
- for (final NamedStepConfiguration namedStepConfiguration :
activeConfiguration.getNamedStepConfigurations()) {
- final StepConfiguration stepConfiguration =
namedStepConfiguration.configuration();
- final Map<String, ConnectorValueReference> stepPropertyValues =
stepConfiguration.getPropertyValues();
- if (stepPropertyValues == null) {
- continue;
- }
- for (final ConnectorValueReference valueReference :
stepPropertyValues.values()) {
- if (valueReference instanceof AssetReference assetReference) {
-
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
- }
- }
- }
-
- logger.debug("Found {} assets referenced for Connector [{}]",
referencedAssetIds.size(), connector.getIdentifier());
-
- final ConnectorAssetRepository assetRepository =
getConnectorAssetRepository();
- final List<Asset> allConnectorAssets =
assetRepository.getAssets(connector.getIdentifier());
- for (final Asset asset : allConnectorAssets) {
- final String assetId = asset.getIdentifier();
- if (!referencedAssetIds.contains(assetId)) {
- try {
- logger.info("Deleting unreferenced asset [id={},name={}]
for connector [{}]", assetId, asset.getName(), connector.getIdentifier());
- assetRepository.deleteAsset(assetId);
- } catch (final Exception e) {
- logger.warn("Unable to delete unreferenced asset
[id={},name={}] for connector [{}]", assetId, asset.getName(),
connector.getIdentifier(), e);
- }
- }
- }
- }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 1a3b91c366..311843db5d 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1469,6 +1469,14 @@ public class NiFiClientUtil {
}
}
+ public void deleteConnectors() throws NiFiClientException, IOException {
+ final ConnectorsEntity connectors =
nifiClient.getFlowClient().getConnectors();
+ for (final ConnectorEntity connector : connectors.getConnectors()) {
+ connector.setDisconnectedNodeAcknowledged(true);
+ nifiClient.getConnectorClient().deleteConnector(connector);
+ }
+ }
+
public void waitForControllerServiceRunStatus(final String id, final
String requestedRunStatus) throws NiFiClientException, IOException {
final long maxTimestamp = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(2L);
logger.info("Waiting for Controller Service {} to have a Run Status of
{}", id, requestedRunStatus);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 93614725e7..3562174747 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -268,6 +268,7 @@ public abstract class NiFiSystemIT implements
NiFiInstanceProvider {
getClientUtil().deleteFlowAnalysisRules();
getClientUtil().deleteParameterContexts();
getClientUtil().deleteParameterProviders();
+ getClientUtil().deleteConnectors();
logger.info("Finished destroyFlow");
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
index b2309891dd..f628b5a4dd 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
@@ -17,10 +17,13 @@
package org.apache.nifi.tests.system.connectors;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.toolkit.client.ConnectorClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.AssetReferenceDTO;
+import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.AssetsEntity;
import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -28,6 +31,8 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -83,14 +88,14 @@ public class ClusteredConnectorAssetsIT extends
ConnectorAssetsIT {
assertTrue(assetFound);
- // Check that the asset exists in the connector-assets directory for
each node
+ // Check that the asset exists in the connector_assets directory for
each node
final File node1Dir =
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
- final File node1AssetsDir = new File(node1Dir, "connector-assets");
+ final File node1AssetsDir = new File(node1Dir, "connector_assets");
final File node1ConnectorDir = new File(node1AssetsDir, connectorId);
assertTrue(node1ConnectorDir.exists());
final File node2Dir =
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
- final File node2AssetsDir = new File(node2Dir, "connector-assets");
+ final File node2AssetsDir = new File(node2Dir, "connector_assets");
final File node2ConnectorDir = new File(node2AssetsDir, connectorId);
assertTrue(node2ConnectorDir.exists());
@@ -102,7 +107,19 @@ public class ClusteredConnectorAssetsIT extends
ConnectorAssetsIT {
assertNotNull(node2AssetIdDirs);
assertEquals(1, node2AssetIdDirs.length);
- // Stop node 2 and delete its connector-assets directory
+ // Configure the connector's "Test Asset" property to reference the
uploaded asset
+ final ConnectorValueReferenceDTO assetValueReference = new
ConnectorValueReferenceDTO();
+ assetValueReference.setValueType("ASSET_REFERENCE");
+ assetValueReference.setAssetReferences(List.of(new
AssetReferenceDTO(uploadedAssetId)));
+ getClientUtil().configureConnectorWithReferences(connectorId, "Asset
Configuration", Map.of("Test Asset", assetValueReference));
+
+ // Apply the updates to the connector
+ getClientUtil().applyConnectorUpdate(connector);
+
+ // Start the connector before disconnecting node 2
+ getClientUtil().startConnector(connectorId);
+
+ // Stop node 2 and delete its connector_assets directory
disconnectNode(2);
getNiFiInstance().getNodeInstance(2).stop();
@@ -112,9 +129,12 @@ public class ClusteredConnectorAssetsIT extends
ConnectorAssetsIT {
// Start node 2 again and wait for it to rejoin the cluster
getNiFiInstance().getNodeInstance(2).start(true);
- reconnectNode(2);
waitForAllNodesConnected();
+ // Verify that the connector state is RUNNING after node 2 rejoins
+ getClientUtil().waitForConnectorState(connectorId,
ConnectorState.RUNNING);
+ getClientUtil().waitForValidConnector(connectorId);
+
// Verify node 2 connector assets directory is recreated and contains
the expected asset
assertTrue(node2AssetsDir.exists());
assertTrue(node2ConnectorDir.exists());
@@ -134,7 +154,5 @@ public class ClusteredConnectorAssetsIT extends
ConnectorAssetsIT {
.anyMatch(a -> uploadedAssetId.equals(a.getAsset().getId()));
assertTrue(assetStillPresent);
-
- connectorClient.deleteConnector(connector);
}
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
index 3a83731776..40a2bce775 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
@@ -17,6 +17,7 @@
package org.apache.nifi.tests.system.connectors;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.client.ConnectorClient;
import org.apache.nifi.toolkit.client.NiFiClientException;
@@ -185,6 +186,8 @@ public class ConnectorAssetsIT extends NiFiSystemIT {
final ConnectorEntity connectorAfterApply =
connectorClient.applyUpdate(connectorBeforeApply);
assertNotNull(connectorAfterApply);
+ getClientUtil().waitForConnectorState(connectorId,
ConnectorState.STOPPED);
+
// Verify that the Asset has been removed from the Connector's Assets
list
final AssetsEntity assetsAfterRemoval =
connectorClient.getAssets(connectorId);
assertNotNull(assetsAfterRemoval);