This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0734e6a05a NIFI-15478 - Restart components affected by synchronized 
assets (#10779)
0734e6a05a is described below

commit 0734e6a05a8820405bfd10d6e618f0064d0e5f3c
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Jan 20 09:50:17 2026 -0500

    NIFI-15478 - Restart components affected by synchronized assets (#10779)
    
    NIFI-15479 - Ensure that invalid ControllerServices don't call OnEnabled
---
 .../controller/service/ServiceStateTransition.java |  5 ++
 .../service/StandardControllerServiceNode.java     | 17 ++--
 .../apache/nifi/asset/AssetComponentManager.java   | 10 +++
 .../nifi/asset/StandardAssetComponentManager.java  | 58 ++++++++++++++
 .../nifi/asset/StandardAssetSynchronizer.java      | 52 ++++++++++---
 .../configuration/FlowControllerConfiguration.java |  2 +-
 .../cs/tests/system/ModifyClasspathService.java    | 58 ++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |  1 +
 .../parameters/ClusteredParameterContextIT.java    | 91 ++++++++++++++++++++++
 9 files changed, 271 insertions(+), 23 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
index 90f6fa0925..67de046c2c 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
@@ -70,6 +70,11 @@ public class ServiceStateTransition {
     public boolean enable(final ControllerServiceReference 
controllerServiceReference) {
         writeLock.lock();
         try {
+            if (state == ControllerServiceState.ENABLED) {
+                logger.debug("{} is already enabled", controllerServiceNode);
+                return true;
+            }
+
             if (state != ControllerServiceState.ENABLING) {
                 logger.debug("{} cannot be transitioned to enabled because 
it's not currently ENABLING but rather {}", controllerServiceNode, state);
                 return false;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 11fa60cf2b..a530513cf0 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -653,16 +653,16 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
                 final ValidationStatus validationStatus = 
validationState.getStatus();
                 if (validationStatus == ValidationStatus.VALID) {
                     LOG.debug("Enabling {} proceeding after performing 
validation", serviceNode);
-                } else if (completeExceptionallyOnFailure) {
-                    final Collection<ValidationResult> errors = 
validationState.getValidationErrors();
-                    final String message = "Enabling %s failed: Validation 
Status [%s] Errors %s".formatted(serviceNode, validationStatus, errors);
-                    future.completeExceptionally(new 
IllegalStateException(message));
                 } else {
+                    final Collection<ValidationResult> errors = 
validationState.getValidationErrors();
+                    if (completeExceptionallyOnFailure) {
+                        future.completeExceptionally(new 
IllegalStateException("Enabling %s failed: Validation Status [%s] Errors 
%s".formatted(serviceNode, validationStatus, errors)));
+                    }
+
                     final long selectedValidationDelay = 
getDelay(validationDelay, INCREMENTAL_VALIDATION_DELAY_MS);
 
                     // Log warning on repeated validation rescheduling
                     if (selectedValidationDelay > MAXIMUM_DELAY.toMillis()) {
-                        final Collection<ValidationResult> errors = 
validationState.getValidationErrors();
                         LOG.warn("Validation rescheduled in {} ms for {} 
Errors {}", selectedValidationDelay, serviceNode, errors);
                     }
 
@@ -670,10 +670,8 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
                         scheduler.schedule(this, selectedValidationDelay, 
TimeUnit.MILLISECONDS);
                         LOG.debug("Validation rescheduled in {} ms for {}", 
selectedValidationDelay, serviceNode);
                     } catch (final RejectedExecutionException e) {
-                        LOG.error("Validation rescheduling rejected for {}", 
serviceNode, e);
-                        final Collection<ValidationResult> errors = 
validationState.getValidationErrors();
-                        final String message = "Enabling %s rejected: 
Validation Status [%s] Errors %s".formatted(serviceNode, validationStatus, 
errors);
-                        future.completeExceptionally(new 
IllegalStateException(message));
+                        LOG.debug("Validation rescheduling rejected for {}", 
serviceNode, e);
+                        future.completeExceptionally(new 
IllegalStateException("Enabling %s rejected: Validation Status [%s] Errors 
%s".formatted(serviceNode, validationStatus, errors)));
                     }
                     // Enable command rescheduled or rejected
                     return;
@@ -788,7 +786,6 @@ public class StandardControllerServiceNode extends 
AbstractComponentNode impleme
         return future;
     }
 
-
     private void invokeDisable(ConfigurationContext configContext) {
         final ControllerService controllerService = 
getControllerServiceImplementation();
         try (final NarCloseable ignored = 
NarCloseable.withComponentNarLoader(getExtensionManager(), 
controllerService.getClass(), getIdentifier())) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
index 35ff55a716..6950509df6 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.asset;
 
+import java.util.Collection;
+
 /**
  * Manages restarting components which reference a given Asset through a 
Parameter.
  */
@@ -36,4 +38,12 @@ public interface AssetComponentManager {
      */
     void restartReferencingComponents(Asset asset);
 
+    /**
+     * Synchronously restarts any components referencing any of the given 
assets.
+     * Components are only restarted once even if they reference multiple 
assets.
+     *
+     * @param assets the assets
+     */
+    void restartReferencingComponents(Collection<Asset> assets);
+
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
index 78b197cfb0..9954c947c1 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -103,6 +104,63 @@ public class StandardAssetComponentManager implements 
AssetComponentManager {
         stoppedStatelessProcessGroups.forEach(ProcessGroup::startProcessing);
     }
 
+    @Override
+    public void restartReferencingComponents(final Collection<Asset> assets) {
+        if (assets == null || assets.isEmpty()) {
+            return;
+        }
+
+        final ParameterContextManager parameterContextManager = 
flowManager.getParameterContextManager();
+
+        // Collect unique components across all assets
+        final Set<ProcessorNode> allProcessorsReferencingParameters = new 
HashSet<>();
+        final Set<ControllerServiceNode> 
allControllerServicesReferencingParameters = new HashSet<>();
+        final Set<ProcessGroup> allAffectedStatelessGroups = new HashSet<>();
+
+        for (final Asset asset : assets) {
+            final ParameterContext parameterContext = 
parameterContextManager.getParameterContext(asset.getParameterContextIdentifier());
+            if (parameterContext == null) {
+                logger.warn("Parameter context [{}] not found for asset [{}]", 
asset.getParameterContextIdentifier(), asset.getName());
+                continue;
+            }
+
+            // Determine which parameters reference the given asset
+            final Set<Parameter> parametersReferencingAsset = 
getParametersReferencingAsset(parameterContext, asset);
+            if (parametersReferencingAsset.isEmpty()) {
+                logger.info("Asset [{}] is not referenced by any parameters in 
ParameterContext [{}]", asset.getName(), parameterContext.getIdentifier());
+                continue;
+            }
+
+            // Collect processors, controller services, and stateless PGs that 
reference a parameter that references the asset
+            
allProcessorsReferencingParameters.addAll(getProcessorsReferencingParameters(parameterContext,
 parametersReferencingAsset));
+            
allControllerServicesReferencingParameters.addAll(getControllerServicesReferencingParameters(parameterContext,
 parametersReferencingAsset));
+            
allAffectedStatelessGroups.addAll(getAffectedStatelessGroups(parameterContext));
+        }
+
+        logger.info("Found {} unique stateless groups, {} unique processors, 
and {} unique controller services referencing {} assets",
+                allAffectedStatelessGroups.size(), 
allProcessorsReferencingParameters.size(), 
allControllerServicesReferencingParameters.size(), assets.size());
+
+        if (allProcessorsReferencingParameters.isEmpty() && 
allControllerServicesReferencingParameters.isEmpty() && 
allAffectedStatelessGroups.isEmpty()) {
+            logger.info("No components found referencing any of the {} 
assets", assets.size());
+            return;
+        }
+
+        // Stop/disable the impacted components
+        final Set<ProcessGroup> stoppedStatelessProcessGroups = new 
HashSet<>();
+        allAffectedStatelessGroups.forEach(processGroup -> 
stopProcessGroup(processGroup, stoppedStatelessProcessGroups));
+
+        final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+        allProcessorsReferencingParameters.forEach(processorNode -> 
stopProcessors(processorNode, stoppedProcessors));
+
+        final Set<ControllerServiceNode> disabledControllerServices = new 
HashSet<>();
+        
allControllerServicesReferencingParameters.forEach(controllerServiceNode -> 
disableControllerService(controllerServiceNode, disabledControllerServices, 
stoppedProcessors));
+
+        // Start/enable the components that were previously stopped/disabled
+        enableControllerServices(disabledControllerServices);
+        stoppedProcessors.forEach(this::startProcessor);
+        stoppedStatelessProcessGroups.forEach(ProcessGroup::startProcessing);
+    }
+
     private void startProcessor(final ProcessorNode processorNode) {
         try {
             final Future<Void> future  = 
processorNode.getProcessGroup().startProcessor(processorNode, false);
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
index 891edbabcf..d9f1cd44d3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
@@ -38,7 +38,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -56,6 +58,7 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
     private static final Duration SYNC_ASSET_RETRY_DURATION = 
Duration.ofSeconds(30);
 
     private final AssetManager assetManager;
+    private final AssetComponentManager assetComponentManager;
     private final FlowManager flowManager;
     private final ClusterCoordinator clusterCoordinator;
     private final WebClientService webClientService;
@@ -64,8 +67,10 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
     public StandardAssetSynchronizer(final FlowController flowController,
                                      final ClusterCoordinator 
clusterCoordinator,
                                      final WebClientService webClientService,
-                                     final NiFiProperties properties) {
+                                     final NiFiProperties properties,
+                                     final AssetComponentManager 
assetComponentManager) {
         this.assetManager = flowController.getAssetManager();
+        this.assetComponentManager = assetComponentManager;
         this.flowManager = flowController.getFlowManager();
         this.clusterCoordinator = clusterCoordinator;
         this.webClientService = webClientService;
@@ -102,16 +107,30 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
         final Set<ParameterContext> parameterContexts = 
parameterContextManager.getParameterContexts();
         logger.info("Found {} parameter contexts for synchronizing assets", 
parameterContexts.size());
 
+        final List<Asset> allSyncedAssets = new ArrayList<>();
         for (final ParameterContext parameterContext : parameterContexts) {
             try {
-                synchronize(assetsRestApiClient, parameterContext);
+                final List<Asset> syncedAssets = 
synchronize(assetsRestApiClient, parameterContext);
+                allSyncedAssets.addAll(syncedAssets);
             } catch (final Exception e) {
                 logger.error("Failed to synchronize assets for parameter 
context [{}]", parameterContext.getIdentifier(), e);
             }
         }
+
+        // Restart components that reference the synchronized assets
+        if (!allSyncedAssets.isEmpty()) {
+            try {
+                logger.info("Restarting components referencing {} synchronized 
assets", allSyncedAssets.size());
+                
assetComponentManager.restartReferencingComponents(allSyncedAssets);
+            } catch (final Exception e) {
+                logger.error("Failed to restart components referencing 
synchronized assets", e);
+            }
+        }
     }
 
-    private void synchronize(final AssetsRestApiClient assetsRestApiClient, 
final ParameterContext parameterContext) {
+    private List<Asset> synchronize(final AssetsRestApiClient 
assetsRestApiClient, final ParameterContext parameterContext) {
+        final List<Asset> syncedAssets = new ArrayList<>();
+
         final Map<String, Asset> existingAssets = 
parameterContext.getParameters().values().stream()
                 .map(Parameter::getReferencedAssets)
                 .flatMap(Collection::stream)
@@ -119,7 +138,7 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
 
         if (existingAssets.isEmpty()) {
             logger.info("Parameter context [{}] does not contain any assets to 
synchronize", parameterContext.getIdentifier());
-            return;
+            return syncedAssets;
         }
 
         logger.info("Parameter context [{}] has {} assets on the current 
node", parameterContext.getIdentifier(), existingAssets.size());
@@ -127,13 +146,13 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
         final AssetsEntity coordinatorAssetsEntity = 
listAssetsWithRetry(assetsRestApiClient, parameterContext.getIdentifier());
         if (coordinatorAssetsEntity == null) {
             logger.error("Timeout listing assets from cluster coordinator for 
parameter context [{}]", parameterContext.getIdentifier());
-            return;
+            return syncedAssets;
         }
 
         final Collection<AssetEntity> coordinatorAssets = 
coordinatorAssetsEntity.getAssets();
         if (coordinatorAssets == null || coordinatorAssets.isEmpty()) {
             logger.info("Parameter context [{}] did not return any assets from 
the cluster coordinator", parameterContext.getIdentifier());
-            return;
+            return syncedAssets;
         }
 
         logger.info("Parameter context [{}] returned {} assets from the 
cluster coordinator", parameterContext.getIdentifier(), 
coordinatorAssets.size());
@@ -142,30 +161,38 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
             final AssetDTO coordinatorAsset = 
coordinatorAssetEntity.getAsset();
             final Asset matchingAsset = 
existingAssets.get(coordinatorAsset.getId());
             try {
-                synchronize(assetsRestApiClient, parameterContext, 
coordinatorAsset, matchingAsset);
+                final Asset syncedAsset = synchronize(assetsRestApiClient, 
parameterContext, coordinatorAsset, matchingAsset);
+                if (syncedAsset != null) {
+                    logger.info("Successfully synchronized asset 
[id={},name={}] for parameter context [{}]",
+                            syncedAsset.getIdentifier(), 
syncedAsset.getName(), parameterContext.getIdentifier());
+                    syncedAssets.add(syncedAsset);
+                }
             } catch (final Exception e) {
                 logger.error("Failed to synchronize asset [id={},name={}] for 
parameter context [{}]",
                         coordinatorAsset.getId(), coordinatorAsset.getName(), 
parameterContext.getIdentifier(), e);
             }
         }
+
+        return syncedAssets;
     }
 
-    private void synchronize(final AssetsRestApiClient assetsRestApiClient, 
final ParameterContext parameterContext, final AssetDTO coordinatorAsset, final 
Asset matchingAsset) {
+    private Asset synchronize(final AssetsRestApiClient assetsRestApiClient, 
final ParameterContext parameterContext, final AssetDTO coordinatorAsset, final 
Asset matchingAsset) {
         final String paramContextId = parameterContext.getIdentifier();
         final String assetId = coordinatorAsset.getId();
         final String assetName = coordinatorAsset.getName();
         if (matchingAsset == null || !matchingAsset.getFile().exists()) {
             logger.info("Synchronizing missing asset [id={},name={}] for 
parameter context [{}]", assetId, assetName, paramContextId);
-            synchronizeAssetWithRetry(assetsRestApiClient, paramContextId, 
coordinatorAsset);
+            return synchronizeAssetWithRetry(assetsRestApiClient, 
paramContextId, coordinatorAsset);
         } else {
             final String coordinatorAssetDigest = coordinatorAsset.getDigest();
             final String matchingAssetDigest = 
matchingAsset.getDigest().orElse(null);
             if (!coordinatorAssetDigest.equals(matchingAssetDigest)) {
                 logger.info("Synchronizing asset [id={},name={}] with updated 
digest [{}] for parameter context [{}]",
                         assetId, assetName, coordinatorAssetDigest, 
paramContextId);
-                synchronizeAssetWithRetry(assetsRestApiClient, paramContextId, 
coordinatorAsset);
+                return synchronizeAssetWithRetry(assetsRestApiClient, 
paramContextId, coordinatorAsset);
             } else {
                 logger.info("Coordinator asset [id={},name={}] found for 
parameter context [{}]: retrieval not required",  assetId, assetName, 
paramContextId);
+                return null;
             }
         }
     }
@@ -193,19 +220,20 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
         }
     }
 
-    private void synchronizeAssetWithRetry(final AssetsRestApiClient 
assetsRestApiClient, final String parameterContextId, final AssetDTO 
coordinatorAsset) {
+    private Asset synchronizeAssetWithRetry(final AssetsRestApiClient 
assetsRestApiClient, final String parameterContextId, final AssetDTO 
coordinatorAsset) {
         final Instant expirationTime = 
Instant.ofEpochMilli(System.currentTimeMillis() + 
SYNC_ASSET_RETRY_DURATION.toMillis());
         while (System.currentTimeMillis() < expirationTime.toEpochMilli()) {
             final Asset syncedAsset = synchronizeAsset(assetsRestApiClient, 
parameterContextId, coordinatorAsset);
             if (syncedAsset != null) {
                 logger.info("Synchronizing asset complete 
[id={},name={},file={},digest={}]",
                         syncedAsset.getIdentifier(), syncedAsset.getName(), 
syncedAsset.getFile().getAbsolutePath(), 
syncedAsset.getDigest().orElse("[Missing]"));
-                return;
+                return syncedAsset;
             }
             logger.info("Unable to synchronize asset [id={},name={}] for 
parameter context [{}]: retrying until [{}]",
                     coordinatorAsset.getId(), coordinatorAsset.getName(), 
parameterContextId, expirationTime);
             sleep(Duration.ofSeconds(5));
         }
+        return null;
     }
 
     private Asset synchronizeAsset(final AssetsRestApiClient 
assetsRestApiClient, final String parameterContextId, final AssetDTO 
coordinatorAsset) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
index 8e3bac3cbe..51071e5c3b 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
@@ -464,7 +464,7 @@ public class FlowControllerConfiguration {
      */
     @Bean
     public AssetSynchronizer assetSynchronizer() throws Exception {
-        return new StandardAssetSynchronizer(flowController(), 
clusterCoordinator, webClientService(), properties);
+        return new StandardAssetSynchronizer(flowController(), 
clusterCoordinator, webClientService(), properties, affectedComponentManager());
     }
 
     /**
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/ModifyClasspathService.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/ModifyClasspathService.java
new file mode 100644
index 0000000000..94a25b540f
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/ModifyClasspathService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * A simple controller service that has a property with 
dynamicallyModifiesClasspath(true).
+ * Used for testing asset synchronization with services that modify the 
classpath.
+ */
+@RequiresInstanceClassLoading
+public class ModifyClasspathService extends AbstractControllerService {
+
+    public static final PropertyDescriptor RESOURCES = new 
PropertyDescriptor.Builder()
+            .name("Resources")
+            .description("Resources to add to the classpath")
+            .required(false)
+            .dynamicallyModifiesClasspath(true)
+            .identifiesExternalResource(ResourceCardinality.MULTIPLE, 
ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(RESOURCES);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final File resourceFile = 
context.getProperty(RESOURCES).asResource().asFile();
+        if (!resourceFile.exists()) {
+            throw new IllegalStateException("Resource file does not exist");
+        }
+    }
+}
+
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index a4da9272ec..99b5498bfe 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -22,6 +22,7 @@ org.apache.nifi.cs.tests.system.StandardCountService
 org.apache.nifi.cs.tests.system.StandardSleepService
 org.apache.nifi.cs.tests.system.MigratePropertiesCountService
 org.apache.nifi.cs.tests.system.MigrationService
+org.apache.nifi.cs.tests.system.ModifyClasspathService
 org.apache.nifi.cs.tests.system.MockCSVReader
 org.apache.nifi.cs.tests.system.MockCSVWriter
 org.apache.nifi.cs.tests.system.VerifyLocalClusterStateService
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
index 95e8e8c33d..371c95dc9e 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
@@ -22,10 +22,14 @@ import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.web.api.entity.AssetEntity;
 import org.apache.nifi.web.api.entity.AssetsEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
 import org.apache.nifi.web.api.entity.ParameterContextEntity;
 import org.apache.nifi.web.api.entity.ParameterContextUpdateRequestEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -41,6 +45,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Repeats all tests in ParameterContextIT but in a clustered mode
  */
 public class ClusteredParameterContextIT extends ParameterContextIT {
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusteredParameterContextIT.class);
+
     @Override
     public NiFiInstanceFactory getInstanceFactory() {
         return createTwoNodeInstanceFactory();
@@ -223,4 +229,89 @@ public class ClusteredParameterContextIT extends 
ParameterContextIT {
         getClientUtil().stopProcessor(generateFlowFile);
         getClientUtil().waitForStoppedProcessor(generateFlowFile.getId());
     }
+
+    @Test
+    public void testSynchronizeAssetsWithModifyClasspathService() throws 
NiFiClientException, IOException, InterruptedException {
+        waitForAllNodesConnected();
+
+        // Create a parameter context with one parameter
+        final Map<String, String> paramValues = Map.of("classpathResource", 
"");
+        final ParameterContextEntity paramContext = 
getClientUtil().createParameterContext("testSynchronizeAssetsWithModifyClasspathService",
 paramValues);
+
+        // Set the Parameter Context on the root Process Group
+        setParameterContext("root", paramContext);
+
+        // Create an asset and update the parameter to reference the asset
+        final File assetFile = new 
File("src/test/resources/sample-assets/helloworld.txt");
+        final AssetEntity asset = createAsset(paramContext.getId(), assetFile);
+
+        final ParameterContextUpdateRequestEntity referenceAssetUpdateRequest 
= getClientUtil().updateParameterAssetReferences(
+                paramContext, Map.of("classpathResource", 
List.of(asset.getAsset().getId())));
+        
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(), 
referenceAssetUpdateRequest.getRequest().getRequestId());
+
+        // Stop node2 so we can set up the flow on node1 first
+        logger.info("Disconnecting node 2");
+        disconnectNode(2);
+        getNiFiInstance().getNodeInstance(2).stop();
+
+        // Delete node2's assets directory to ensure it's empty
+        final File node2Dir = 
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
+        final File node2AssetsDir = new File(node2Dir, "assets");
+        if (node2AssetsDir.exists()) {
+            FileUtils.deleteFilesInDir(node2AssetsDir, (dir, name) -> true, 
null, true, true);
+            assertTrue(node2AssetsDir.delete());
+        }
+        assertFalse(node2AssetsDir.exists());
+
+        // Create an instance of ModifyClasspathService and use the parameter 
as the value of the Resources property
+        final ControllerServiceEntity modifyClasspathService = 
getClientUtil().createControllerService("ModifyClasspathService");
+        
getClientUtil().updateControllerServiceProperties(modifyClasspathService, 
Map.of("Resources", "#{classpathResource}"));
+
+        // Enable the service
+        final ControllerServiceEntity serviceToEnable = 
getNifiClient().getControllerServicesClient().getControllerService(modifyClasspathService.getId());
+        getClientUtil().enableControllerService(serviceToEnable);
+
+        // Wait for the service to be enabled and verify its state
+        getClientUtil().waitForControllerServiceState("root", "ENABLED", 
List.of(modifyClasspathService.getId()));
+        assertControllerServiceEnabled(modifyClasspathService.getId());
+
+        logger.info("Starting node 2...");
+
+        // Start node2
+        getNiFiInstance().getNodeInstance(2).start(true);
+        //reconnectNode(2);
+        waitForAllNodesConnected();
+
+        // Verify node2 asset directories are recreated
+        assertTrue(node2AssetsDir.exists());
+        final File node2ContextDir = new File(node2AssetsDir, 
paramContext.getId());
+        assertTrue(node2ContextDir.exists());
+
+        final File[] node2AssetFiles = node2ContextDir.listFiles();
+        assertNotNull(node2AssetFiles);
+        assertEquals(1, node2AssetFiles.length);
+
+        logger.info("Verifying controller service is still ENABLED...");
+
+        // Verify the controller service is still ENABLED after node2 joins
+        getClientUtil().waitForControllerServiceState("root", "ENABLED", 
List.of(modifyClasspathService.getId()));
+        assertControllerServiceEnabled(modifyClasspathService.getId());
+
+        logger.info("Starting clean up...");
+
+        // Clean up - disable the service
+        final ControllerServiceEntity serviceToDisable = 
getNifiClient().getControllerServicesClient().getControllerService(modifyClasspathService.getId());
+        getClientUtil().disableControllerService(serviceToDisable);
+        getClientUtil().waitForControllerServiceState("root", "DISABLED", 
List.of(modifyClasspathService.getId()));
+    }
+
+    private void assertControllerServiceEnabled(final String serviceId) throws 
NiFiClientException, IOException {
+        final ControllerServicesEntity servicesEntity = 
getNifiClient().getFlowClient().getControllerServices("root");
+        final ControllerServiceEntity service = 
servicesEntity.getControllerServices().stream()
+                .filter(s -> s.getId().equals(serviceId))
+                .findFirst()
+                .orElse(null);
+        assertNotNull(service, "Controller service not found: " + serviceId);
+        assertEquals("ENABLED", service.getComponent().getState(), "Controller 
service should be ENABLED");
+    }
 }

Reply via email to