This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/NIFI-15258 by this push:
     new d386883011 NIFI-15495 Restart Connectors that reference assets that 
were synchro… (#10806)
d386883011 is described below

commit d386883011d00f9e8a8d7dfe6ca10d7df7556f14
Author: Bryan Bende <[email protected]>
AuthorDate: Thu Jan 22 14:48:23 2026 -0500

    NIFI-15495 Restart Connectors that reference assets that were synchro… 
(#10806)
    
    * NIFI-15495 Restart Connectors that reference assets that were synchronized
    - Ensure Connectors re-resolve property values before starting
    - Ensure asset clean up happens only after applyUpdate fully finishes
    - Add connector asset properties to default nifi.properties
    
    * Fix system test
    
    * Encapsulate restart logic in new method on ConnectorNode
    
    * Fix JavaDoc
---
 .../server/MockConnectorAssetManager.java          |  2 +-
 .../nifi/asset/StandardConnectorAssetManager.java  |  2 +-
 .../components/connector/ConnectorRepository.java  | 10 +++
 .../MutableConnectorConfigurationContext.java      |  5 ++
 .../nifi-framework/nifi-framework-core/pom.xml     |  3 +
 .../asset/StandardConnectorAssetSynchronizer.java  | 55 ++++++++++++++--
 .../StandardConnectorConfigurationContext.java     | 22 +++++++
 .../connector/StandardConnectorNode.java           |  7 +-
 .../connector/StandardConnectorRepository.java     | 77 +++++++++++++++++++++-
 .../nifi-framework/nifi-resources/pom.xml          |  2 +
 .../src/main/resources/conf/nifi.properties        |  2 +
 .../nifi/web/dao/impl/StandardConnectorDAO.java    | 43 +-----------
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  8 +++
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  1 +
 .../connectors/ClusteredConnectorAssetsIT.java     | 32 +++++++--
 .../tests/system/connectors/ConnectorAssetsIT.java |  3 +
 16 files changed, 214 insertions(+), 60 deletions(-)

diff --git 
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
 
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
index e8272be173..92892f6275 100644
--- 
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
+++ 
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorAssetManager.java
@@ -41,7 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 public class MockConnectorAssetManager implements AssetManager {
 
     private static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory";
-    private static final String DEFAULT_ASSET_STORAGE_LOCATION = 
"target/mock-connector-assets";
+    private static final String DEFAULT_ASSET_STORAGE_LOCATION = 
"target/mock_connector_assets";
 
     private final Map<String, Asset> assets = new ConcurrentHashMap<>();
     private volatile File assetStorageLocation;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
index 7cc907de11..87b01a47b1 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/asset/StandardConnectorAssetManager.java
@@ -46,7 +46,7 @@ public class StandardConnectorAssetManager implements 
AssetManager {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardConnectorAssetManager.class);
 
     public static final String ASSET_STORAGE_LOCATION_PROPERTY = "directory";
-    public static final String DEFAULT_ASSET_STORAGE_LOCATION = 
"./connector-assets";
+    public static final String DEFAULT_ASSET_STORAGE_LOCATION = 
"./connector_assets";
 
     private volatile File assetStorageLocation;
     private final Map<String, Asset> assets = new ConcurrentHashMap<>();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
index 2a134d8e5e..59b0bf03af 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
@@ -76,6 +76,14 @@ public interface ConnectorRepository {
      */
     Future<Void> stopConnector(ConnectorNode connector);
 
+    /**
+     * Restarts the given Connector, managing any appropriate lifecycle events.
+     *
+     * @param connector the Connector to restart
+     * @return a CompletableFuture that will be completed when the Connector 
has restarted
+     */
+    Future<Void> restartConnector(ConnectorNode connector);
+
     void configureConnector(ConnectorNode connector, String stepName, 
StepConfiguration configuration) throws FlowUpdateException;
 
     void applyUpdate(ConnectorNode connector, ConnectorUpdateContext context) 
throws FlowUpdateException;
@@ -83,6 +91,8 @@ public interface ConnectorRepository {
     void inheritConfiguration(ConnectorNode connector, 
List<VersionedConfigurationStep> activeFlowConfiguration,
         List<VersionedConfigurationStep> workingFlowConfiguration, Bundle 
flowContextBundle) throws FlowUpdateException;
 
+    void discardWorkingConfiguration(ConnectorNode connector);
+
     SecretsManager getSecretsManager();
 
     /**
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
index aaf4c1fdf3..22191633af 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/MutableConnectorConfigurationContext.java
@@ -41,6 +41,11 @@ public interface MutableConnectorConfigurationContext 
extends ConnectorConfigura
      */
     ConfigurationUpdateResult replaceProperties(String stepName, 
StepConfiguration configuration);
 
+    /**
+     * Resolves all existing property values.
+     */
+    void resolvePropertyValues();
+
     /**
      * Converts this mutable configuration context to an immutable 
ConnectorConfiguration.
      * @return the ConnectorConfiguration
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 479567966e..0ae05a9969 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -324,6 +324,9 @@
                         
<exclude>src/test/resources/old-swap-file.swap</exclude>
                         <exclude>src/test/resources/xxe_template.xml</exclude>
                         
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
+                        <exclude>src/test/resources/colors.txt</exclude>
+                        
<exclude>src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Another_Test_Step.md</exclude>
+                        
<exclude>src/test/resources/TestRuntimeManifest/nifi-test-components-nar/META-INF/docs/steps/org.example.TestConnector/Test_Step.md</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
index 2eb1aa5a9a..ad70446ce5 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardConnectorAssetSynchronizer.java
@@ -22,6 +22,8 @@ import org.apache.nifi.client.NiFiRestApiRetryableException;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorState;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.util.NiFiProperties;
@@ -37,8 +39,12 @@ import java.io.InputStream;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -55,6 +61,7 @@ public class StandardConnectorAssetSynchronizer implements 
AssetSynchronizer {
 
     private final AssetManager assetManager;
     private final FlowManager flowManager;
+    private final ConnectorRepository connectorRepository;
     private final ClusterCoordinator clusterCoordinator;
     private final WebClientService webClientService;
     private final NiFiProperties properties;
@@ -65,6 +72,7 @@ public class StandardConnectorAssetSynchronizer implements 
AssetSynchronizer {
                                               final NiFiProperties properties) 
{
         this.assetManager = flowController.getConnectorAssetManager();
         this.flowManager = flowController.getFlowManager();
+        this.connectorRepository = flowController.getConnectorRepository();
         this.clusterCoordinator = clusterCoordinator;
         this.webClientService = webClientService;
         this.properties = properties;
@@ -96,16 +104,43 @@ public class StandardConnectorAssetSynchronizer implements 
AssetSynchronizer {
         final List<ConnectorNode> connectors = flowManager.getAllConnectors();
         logger.info("Found {} connectors for synchronizing assets", 
connectors.size());
 
+        final Set<ConnectorNode> connectorsWithSynchronizedAssets = new 
HashSet<>();
         for (final ConnectorNode connector : connectors) {
             try {
-                synchronize(assetsRestApiClient, connector);
+                final boolean assetSynchronized = 
synchronize(assetsRestApiClient, connector);
+                if (assetSynchronized) {
+                    connectorsWithSynchronizedAssets.add(connector);
+                }
             } catch (final Exception e) {
                 logger.error("Failed to synchronize assets for connector 
[{}]", connector.getIdentifier(), e);
             }
         }
+
+        
restartConnectorsWithSynchronizedAssets(connectorsWithSynchronizedAssets);
+    }
+
+    private void restartConnectorsWithSynchronizedAssets(final 
Set<ConnectorNode> connectorsWithSynchronizedAssets) {
+        for (final ConnectorNode connector : connectorsWithSynchronizedAssets) 
{
+            final ConnectorState currentState = connector.getDesiredState();
+            if (currentState == ConnectorState.RUNNING) {
+                logger.info("Restarting connector [{}] after asset 
synchronization", connector.getIdentifier());
+                try {
+                    final Future<Void> restartFuture = 
connectorRepository.restartConnector(connector);
+                    restartFuture.get();
+                    logger.info("Successfully restarted connector [{}] after 
asset synchronization", connector.getIdentifier());
+                } catch (final InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    logger.error("Interrupted while restarting connector [{}] 
after asset synchronization", connector.getIdentifier(), e);
+                } catch (final ExecutionException e) {
+                    logger.error("Failed to restart connector [{}] after asset 
synchronization", connector.getIdentifier(), e.getCause());
+                }
+            } else {
+                logger.info("Connector [{}] is not running (state={}): 
skipping restart after asset synchronization", connector.getIdentifier(), 
currentState);
+            }
+        }
     }
 
-    private void synchronize(final AssetsRestApiClient assetsRestApiClient, 
final ConnectorNode connector) {
+    private boolean synchronize(final AssetsRestApiClient assetsRestApiClient, 
final ConnectorNode connector) {
         final String connectorId = connector.getIdentifier();
         final Map<String, Asset> existingAssets = 
assetManager.getAssets(connectorId).stream()
                 .collect(Collectors.toMap(Asset::getIdentifier, 
Function.identity()));
@@ -113,35 +148,41 @@ public class StandardConnectorAssetSynchronizer 
implements AssetSynchronizer {
         final AssetsEntity coordinatorAssetsEntity = 
listConnectorAssetsWithRetry(assetsRestApiClient, connectorId);
         if (coordinatorAssetsEntity == null) {
             logger.error("Timeout listing assets from cluster coordinator for 
connector [{}]", connectorId);
-            return;
+            return false;
         }
 
         final Collection<AssetEntity> coordinatorAssets = 
coordinatorAssetsEntity.getAssets();
         if (coordinatorAssets == null || coordinatorAssets.isEmpty()) {
             logger.info("Connector [{}] did not return any assets from the 
cluster coordinator", connectorId);
-            return;
+            return false;
         }
 
         logger.info("Connector [{}] returned {} assets from the cluster 
coordinator", connectorId, coordinatorAssets.size());
 
+        boolean assetSynchronized = false;
         for (final AssetEntity coordinatorAssetEntity : coordinatorAssets) {
             final AssetDTO coordinatorAsset = 
coordinatorAssetEntity.getAsset();
             final Asset matchingAsset = 
existingAssets.get(coordinatorAsset.getId());
             try {
-                synchronize(assetsRestApiClient, connectorId, 
coordinatorAsset, matchingAsset);
+                final boolean assetWasSynchronized = 
synchronize(assetsRestApiClient, connectorId, coordinatorAsset, matchingAsset);
+                if (assetWasSynchronized) {
+                    assetSynchronized = true;
+                }
             } catch (final Exception e) {
                 logger.error("Failed to synchronize asset [id={},name={}] for 
connector [{}]",
                         coordinatorAsset.getId(), coordinatorAsset.getName(), 
connectorId, e);
             }
         }
+        return assetSynchronized;
     }
 
-    private void synchronize(final AssetsRestApiClient assetsRestApiClient, 
final String connectorId, final AssetDTO coordinatorAsset, final Asset 
matchingAsset) {
+    private boolean synchronize(final AssetsRestApiClient assetsRestApiClient, 
final String connectorId, final AssetDTO coordinatorAsset, final Asset 
matchingAsset) {
         final String assetId = coordinatorAsset.getId();
         final String assetName = coordinatorAsset.getName();
         if (matchingAsset == null || !matchingAsset.getFile().exists()) {
             logger.info("Synchronizing missing asset [id={},name={}] for 
connector [{}]", assetId, assetName, connectorId);
             synchronizeConnectorAssetWithRetry(assetsRestApiClient, 
connectorId, coordinatorAsset);
+            return true;
         } else {
             final String coordinatorAssetDigest = coordinatorAsset.getDigest();
             final String matchingAssetDigest = 
matchingAsset.getDigest().orElse(null);
@@ -149,8 +190,10 @@ public class StandardConnectorAssetSynchronizer implements 
AssetSynchronizer {
                 logger.info("Synchronizing asset [id={},name={}] with updated 
digest [{}] for connector [{}]",
                         assetId, assetName, coordinatorAssetDigest, 
connectorId);
                 synchronizeConnectorAssetWithRetry(assetsRestApiClient, 
connectorId, coordinatorAsset);
+                return true;
             } else {
                 logger.info("Coordinator asset [id={},name={}] found for 
connector [{}]: retrieval not required", assetId, assetName, connectorId);
+                return false;
             }
         }
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
index bc7c2b9d68..c37a246933 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorConfigurationContext.java
@@ -21,6 +21,8 @@ import org.apache.nifi.asset.Asset;
 import org.apache.nifi.asset.AssetManager;
 import org.apache.nifi.components.connector.secrets.SecretProvider;
 import org.apache.nifi.components.connector.secrets.SecretsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,6 +39,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class StandardConnectorConfigurationContext implements 
MutableConnectorConfigurationContext, Cloneable {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardConnectorConfigurationContext.class);
+
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
@@ -205,6 +209,7 @@ public class StandardConnectorConfigurationContext 
implements MutableConnectorCo
                 .ifPresent(resolvedAssetValues::add);
         }
 
+        logger.debug("Resolved {} to {}", assetReference, resolvedAssetValues);
         return new StringLiteralValue(String.join(",", resolvedAssetValues));
     }
 
@@ -265,6 +270,23 @@ public class StandardConnectorConfigurationContext 
implements MutableConnectorCo
         }
     }
 
+    @Override
+    public void resolvePropertyValues() {
+        writeLock.lock();
+        try {
+            for (final Map.Entry<String, StepConfiguration> entry : 
propertyConfigurations.entrySet()) {
+                final String stepName = entry.getKey();
+                final StepConfiguration stepConfig = entry.getValue();
+                final Map<String, ConnectorValueReference> stepProperties = 
stepConfig.getPropertyValues();
+
+                final StepConfiguration resolvedConfig = 
resolvePropertyValues(stepProperties);
+                this.resolvedPropertyConfigurations.put(stepName, 
resolvedConfig);
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     @Override
     public ConnectorConfiguration toConnectorConfiguration() {
         readLock.lock();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 4e2213eb0a..a2501c883b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -373,11 +373,12 @@ public class StandardConnectorNode implements 
ConnectorNode {
 
     private void start(final FlowEngine scheduler, final 
CompletableFuture<Void> startCompleteFuture) {
         try {
+            stateTransition.setDesiredState(ConnectorState.RUNNING);
+            
activeFlowContext.getConfigurationContext().resolvePropertyValues();
+
             verifyCanStart();
 
-            stateTransition.setDesiredState(ConnectorState.RUNNING);
             final ConnectorState currentState = getCurrentState();
-
             switch (currentState) {
                 case STARTING -> {
                     logger.debug("{} is already starting; adding future to 
pending start futures", this);
@@ -862,6 +863,8 @@ public class StandardConnectorNode implements ConnectorNode 
{
                 .map(File::getAbsolutePath)
                 .ifPresent(resolvedAssetValues::add);
         }
+
+        logger.debug("Resolved {} to {} for {}", assetReference, 
resolvedAssetValues, this);
         return String.join(",", resolvedAssetValues);
     }
 
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index b506fa3b87..b78d504112 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.components.connector;
 
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.asset.Asset;
 import org.apache.nifi.components.connector.secrets.SecretsManager;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.flow.Bundle;
@@ -30,10 +31,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 public class StandardConnectorRepository implements ConnectorRepository {
@@ -103,6 +107,33 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
         return connector.stop(lifecycleExecutor);
     }
 
+    @Override
+    public Future<Void> restartConnector(final ConnectorNode connector) {
+        final CompletableFuture<Void> restartCompleteFuture = new 
CompletableFuture<>();
+        restartConnector(connector, restartCompleteFuture);
+        return restartCompleteFuture;
+    }
+
+    private void restartConnector(final ConnectorNode connector, final 
CompletableFuture<Void> restartCompleteFuture) {
+        try {
+            final Future<Void> stopFuture = connector.stop(lifecycleExecutor);
+            stopFuture.get();
+
+            final Future<Void> startFuture = 
connector.start(lifecycleExecutor);
+            startFuture.get();
+
+            logger.info("Successfully restarted connector [{}]", 
connector.getIdentifier());
+            restartCompleteFuture.complete(null);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.error("Interrupted while restarting connector [{}]", 
connector.getIdentifier(), e);
+            restartCompleteFuture.completeExceptionally(e);
+        } catch (final ExecutionException e) {
+            logger.error("Failed to restart connector [{}]", 
connector.getIdentifier(), e.getCause());
+            restartCompleteFuture.completeExceptionally(e);
+        }
+    }
+
     @Override
     public void applyUpdate(final ConnectorNode connector, final 
ConnectorUpdateContext context) throws FlowUpdateException {
         final ConnectorState initialDesiredState = connector.getDesiredState();
@@ -117,7 +148,10 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
 
         // Update connector in a background thread. This will handle 
transitioning the Connector state appropriately
         // so that it's clear when the update has completed.
-        lifecycleExecutor.submit(() -> updateConnector(connector, 
initialDesiredState, context));
+        lifecycleExecutor.submit(() -> {
+            updateConnector(connector, initialDesiredState, context);
+            cleanUpAssets(connector);
+        });
     }
 
     private void updateConnector(final ConnectorNode connector, final 
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
@@ -189,6 +223,41 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
         }
     }
 
+    private void cleanUpAssets(final ConnectorNode connector) {
+        final FrameworkFlowContext activeFlowContext = 
connector.getActiveFlowContext();
+        final ConnectorConfiguration activeConfiguration = 
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
+
+        final Set<String> referencedAssetIds = new HashSet<>();
+        for (final NamedStepConfiguration namedStepConfiguration : 
activeConfiguration.getNamedStepConfigurations()) {
+            final StepConfiguration stepConfiguration = 
namedStepConfiguration.configuration();
+            final Map<String, ConnectorValueReference> stepPropertyValues = 
stepConfiguration.getPropertyValues();
+            if (stepPropertyValues == null) {
+                continue;
+            }
+            for (final ConnectorValueReference valueReference : 
stepPropertyValues.values()) {
+                if (valueReference instanceof AssetReference assetReference) {
+                    
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
+                }
+            }
+        }
+
+        logger.debug("Found {} assets referenced for Connector [{}]", 
referencedAssetIds.size(), connector.getIdentifier());
+
+        final ConnectorAssetRepository assetRepository = getAssetRepository();
+        final List<Asset> allConnectorAssets = 
assetRepository.getAssets(connector.getIdentifier());
+        for (final Asset asset : allConnectorAssets) {
+            final String assetId = asset.getIdentifier();
+            if (!referencedAssetIds.contains(assetId)) {
+                try {
+                    logger.info("Deleting unreferenced asset [id={},name={}] 
for connector [{}]", assetId, asset.getName(), connector.getIdentifier());
+                    assetRepository.deleteAsset(assetId);
+                } catch (final Exception e) {
+                    logger.warn("Unable to delete unreferenced asset 
[id={},name={}] for connector [{}]", assetId, asset.getName(), 
connector.getIdentifier(), e);
+                }
+            }
+        }
+    }
+
     @Override
     public void configureConnector(final ConnectorNode connector, final String 
stepName, final StepConfiguration configuration) throws FlowUpdateException {
         connector.setConfiguration(stepName, configuration);
@@ -209,6 +278,12 @@ public class StandardConnectorRepository implements 
ConnectorRepository {
         }
     }
 
+    @Override
+    public void discardWorkingConfiguration(final ConnectorNode connector) {
+        connector.discardWorkingConfiguration();
+        cleanUpAssets(connector);
+    }
+
     @Override
     public SecretsManager getSecretsManager() {
         return secretsManager;
diff --git a/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 77388898dc..2b10f4be3a 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ b/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -121,6 +121,8 @@
         <!-- Asset Management properties -->
         
<nifi.asset.manager.implementation>org.apache.nifi.asset.StandardAssetManager</nifi.asset.manager.implementation>
         
<nifi.asset.manager.properties.directory>./assets</nifi.asset.manager.properties.directory>
+        
<nifi.connector.asset.manager.implementation>org.apache.nifi.asset.StandardConnectorAssetManager</nifi.connector.asset.manager.implementation>
+        
<nifi.connector.asset.manager.properties.directory>./connector_assets</nifi.connector.asset.manager.properties.directory>
 
         <!-- nifi.properties: web properties -->
         <nifi.web.http.host />
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 1c5b89b6f7..bc713549f1 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -137,6 +137,8 @@ 
nifi.nar.persistence.provider.properties.directory=${nifi.nar.persistence.provid
 # Asset Management
 nifi.asset.manager.implementation=${nifi.asset.manager.implementation}
 
nifi.asset.manager.properties.directory=${nifi.asset.manager.properties.directory}
+nifi.connector.asset.manager.implementation=${nifi.connector.asset.manager.implementation}
+nifi.connector.asset.manager.properties.directory=${nifi.connector.asset.manager.properties.directory}
 
 # Site to Site properties
 nifi.remote.input.host=
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index 19b58dd207..8d18a5dff9 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -22,14 +22,11 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.connector.AssetReference;
 import org.apache.nifi.components.connector.ConnectorAssetRepository;
-import org.apache.nifi.components.connector.ConnectorConfiguration;
 import org.apache.nifi.components.connector.ConnectorNode;
 import org.apache.nifi.components.connector.ConnectorRepository;
 import org.apache.nifi.components.connector.ConnectorUpdateContext;
 import org.apache.nifi.components.connector.ConnectorValueReference;
 import org.apache.nifi.components.connector.ConnectorValueType;
-import org.apache.nifi.components.connector.FrameworkFlowContext;
-import org.apache.nifi.components.connector.NamedStepConfiguration;
 import org.apache.nifi.components.connector.SecretReference;
 import org.apache.nifi.components.connector.StepConfiguration;
 import org.apache.nifi.components.connector.StringLiteralValue;
@@ -51,7 +48,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -198,7 +194,6 @@ public class StandardConnectorDAO implements ConnectorDAO {
         final ConnectorNode connector = getConnector(id);
         try {
             getConnectorRepository().applyUpdate(connector, updateContext);
-            cleanUpAssets(connector);
         } catch (final Exception e) {
             throw new NiFiCoreException("Failed to apply connector update: " + 
e, e);
         }
@@ -207,8 +202,7 @@ public class StandardConnectorDAO implements ConnectorDAO {
     @Override
     public void discardWorkingConfiguration(final String id) {
         final ConnectorNode connector = getConnector(id);
-        connector.discardWorkingConfiguration();
-        cleanUpAssets(connector);
+        getConnectorRepository().discardWorkingConfiguration(connector);
     }
 
     @Override
@@ -256,41 +250,6 @@ public class StandardConnectorDAO implements ConnectorDAO {
         final ConnectorAssetRepository assetRepository = 
getConnectorAssetRepository();
         return assetRepository.getAsset(assetId);
     }
-
-    private void cleanUpAssets(final ConnectorNode connector) {
-        final FrameworkFlowContext activeFlowContext = 
connector.getActiveFlowContext();
-        final ConnectorConfiguration activeConfiguration = 
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
-
-        final Set<String> referencedAssetIds = new HashSet<>();
-        for (final NamedStepConfiguration namedStepConfiguration : 
activeConfiguration.getNamedStepConfigurations()) {
-            final StepConfiguration stepConfiguration = 
namedStepConfiguration.configuration();
-            final Map<String, ConnectorValueReference> stepPropertyValues = 
stepConfiguration.getPropertyValues();
-            if (stepPropertyValues == null) {
-                continue;
-            }
-            for (final ConnectorValueReference valueReference : 
stepPropertyValues.values()) {
-                if (valueReference instanceof AssetReference assetReference) {
-                    
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
-                }
-            }
-        }
-
-        logger.debug("Found {} assets referenced for Connector [{}]", 
referencedAssetIds.size(), connector.getIdentifier());
-
-        final ConnectorAssetRepository assetRepository = 
getConnectorAssetRepository();
-        final List<Asset> allConnectorAssets = 
assetRepository.getAssets(connector.getIdentifier());
-        for (final Asset asset : allConnectorAssets) {
-            final String assetId = asset.getIdentifier();
-            if (!referencedAssetIds.contains(assetId)) {
-                try {
-                    logger.info("Deleting unreferenced asset [id={},name={}] 
for connector [{}]", assetId, asset.getName(), connector.getIdentifier());
-                    assetRepository.deleteAsset(assetId);
-                } catch (final Exception e) {
-                    logger.warn("Unable to delete unreferenced asset 
[id={},name={}] for connector [{}]", assetId, asset.getName(), 
connector.getIdentifier(), e);
-                }
-            }
-        }
-    }
 }
 
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 1a3b91c366..311843db5d 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -1469,6 +1469,14 @@ public class NiFiClientUtil {
         }
     }
 
+    public void deleteConnectors() throws NiFiClientException, IOException {
+        final ConnectorsEntity connectors = 
nifiClient.getFlowClient().getConnectors();
+        for (final ConnectorEntity connector : connectors.getConnectors()) {
+            connector.setDisconnectedNodeAcknowledged(true);
+            nifiClient.getConnectorClient().deleteConnector(connector);
+        }
+    }
+
     public void waitForControllerServiceRunStatus(final String id, final 
String requestedRunStatus) throws NiFiClientException, IOException {
         final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
         logger.info("Waiting for Controller Service {} to have a Run Status of 
{}", id, requestedRunStatus);
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 93614725e7..3562174747 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -268,6 +268,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
         getClientUtil().deleteFlowAnalysisRules();
         getClientUtil().deleteParameterContexts();
         getClientUtil().deleteParameterProviders();
+        getClientUtil().deleteConnectors();
 
         logger.info("Finished destroyFlow");
     }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
index b2309891dd..f628b5a4dd 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorAssetsIT.java
@@ -17,10 +17,13 @@
 
 package org.apache.nifi.tests.system.connectors;
 
+import org.apache.nifi.components.connector.ConnectorState;
 import org.apache.nifi.tests.system.NiFiInstanceFactory;
 import org.apache.nifi.toolkit.client.ConnectorClient;
 import org.apache.nifi.toolkit.client.NiFiClientException;
 import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.AssetReferenceDTO;
+import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
 import org.apache.nifi.web.api.entity.AssetEntity;
 import org.apache.nifi.web.api.entity.AssetsEntity;
 import org.apache.nifi.web.api.entity.ConnectorEntity;
@@ -28,6 +31,8 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -83,14 +88,14 @@ public class ClusteredConnectorAssetsIT extends 
ConnectorAssetsIT {
 
         assertTrue(assetFound);
 
-        // Check that the asset exists in the connector-assets directory for 
each node
+        // Check that the asset exists in the connector_assets directory for 
each node
         final File node1Dir = 
getNiFiInstance().getNodeInstance(1).getInstanceDirectory();
-        final File node1AssetsDir = new File(node1Dir, "connector-assets");
+        final File node1AssetsDir = new File(node1Dir, "connector_assets");
         final File node1ConnectorDir = new File(node1AssetsDir, connectorId);
         assertTrue(node1ConnectorDir.exists());
 
         final File node2Dir = 
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
-        final File node2AssetsDir = new File(node2Dir, "connector-assets");
+        final File node2AssetsDir = new File(node2Dir, "connector_assets");
         final File node2ConnectorDir = new File(node2AssetsDir, connectorId);
         assertTrue(node2ConnectorDir.exists());
 
@@ -102,7 +107,19 @@ public class ClusteredConnectorAssetsIT extends 
ConnectorAssetsIT {
         assertNotNull(node2AssetIdDirs);
         assertEquals(1, node2AssetIdDirs.length);
 
-        // Stop node 2 and delete its connector-assets directory
+        // Configure the connector's "Test Asset" property to reference the 
uploaded asset
+        final ConnectorValueReferenceDTO assetValueReference = new 
ConnectorValueReferenceDTO();
+        assetValueReference.setValueType("ASSET_REFERENCE");
+        assetValueReference.setAssetReferences(List.of(new 
AssetReferenceDTO(uploadedAssetId)));
+        getClientUtil().configureConnectorWithReferences(connectorId, "Asset 
Configuration", Map.of("Test Asset", assetValueReference));
+
+        // Apply the updates to the connector
+        getClientUtil().applyConnectorUpdate(connector);
+
+        // Start the connector before disconnecting node 2
+        getClientUtil().startConnector(connectorId);
+
+        // Stop node 2 and delete its connector_assets directory
         disconnectNode(2);
         getNiFiInstance().getNodeInstance(2).stop();
 
@@ -112,9 +129,12 @@ public class ClusteredConnectorAssetsIT extends 
ConnectorAssetsIT {
 
         // Start node 2 again and wait for it to rejoin the cluster
         getNiFiInstance().getNodeInstance(2).start(true);
-        reconnectNode(2);
         waitForAllNodesConnected();
 
+        // Verify that the connector state is RUNNING after node 2 rejoins
+        getClientUtil().waitForConnectorState(connectorId, 
ConnectorState.RUNNING);
+        getClientUtil().waitForValidConnector(connectorId);
+
         // Verify node 2 connector assets directory is recreated and contains 
the expected asset
         assertTrue(node2AssetsDir.exists());
         assertTrue(node2ConnectorDir.exists());
@@ -134,7 +154,5 @@ public class ClusteredConnectorAssetsIT extends 
ConnectorAssetsIT {
                 .anyMatch(a -> uploadedAssetId.equals(a.getAsset().getId()));
 
         assertTrue(assetStillPresent);
-
-        connectorClient.deleteConnector(connector);
     }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
index 3a83731776..40a2bce775 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorAssetsIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.tests.system.connectors;
 
+import org.apache.nifi.components.connector.ConnectorState;
 import org.apache.nifi.tests.system.NiFiSystemIT;
 import org.apache.nifi.toolkit.client.ConnectorClient;
 import org.apache.nifi.toolkit.client.NiFiClientException;
@@ -185,6 +186,8 @@ public class ConnectorAssetsIT extends NiFiSystemIT {
         final ConnectorEntity connectorAfterApply = 
connectorClient.applyUpdate(connectorBeforeApply);
         assertNotNull(connectorAfterApply);
 
+        getClientUtil().waitForConnectorState(connectorId, 
ConnectorState.STOPPED);
+
         // Verify that the Asset has been removed from the Connector's Assets 
list
         final AssetsEntity assetsAfterRemoval = 
connectorClient.getAssets(connectorId);
         assertNotNull(assetsAfterRemoval);


Reply via email to