This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0734e6a05a NIFI-15478 - Restart components affected by synchronized
assets (#10779)
0734e6a05a is described below
commit 0734e6a05a8820405bfd10d6e618f0064d0e5f3c
Author: Bryan Bende <[email protected]>
AuthorDate: Tue Jan 20 09:50:17 2026 -0500
NIFI-15478 - Restart components affected by synchronized assets (#10779)
NIFI-15479 - Ensure that invalid ControllerServices don't call OnEnabled
---
.../controller/service/ServiceStateTransition.java | 5 ++
.../service/StandardControllerServiceNode.java | 17 ++--
.../apache/nifi/asset/AssetComponentManager.java | 10 +++
.../nifi/asset/StandardAssetComponentManager.java | 58 ++++++++++++++
.../nifi/asset/StandardAssetSynchronizer.java | 52 ++++++++++---
.../configuration/FlowControllerConfiguration.java | 2 +-
.../cs/tests/system/ModifyClasspathService.java | 58 ++++++++++++++
.../org.apache.nifi.controller.ControllerService | 1 +
.../parameters/ClusteredParameterContextIT.java | 91 ++++++++++++++++++++++
9 files changed, 271 insertions(+), 23 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
index 90f6fa0925..67de046c2c 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
@@ -70,6 +70,11 @@ public class ServiceStateTransition {
public boolean enable(final ControllerServiceReference
controllerServiceReference) {
writeLock.lock();
try {
+ if (state == ControllerServiceState.ENABLED) {
+ logger.debug("{} is already enabled", controllerServiceNode);
+ return true;
+ }
+
if (state != ControllerServiceState.ENABLING) {
logger.debug("{} cannot be transitioned to enabled because
it's not currently ENABLING but rather {}", controllerServiceNode, state);
return false;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 11fa60cf2b..a530513cf0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -653,16 +653,16 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
final ValidationStatus validationStatus =
validationState.getStatus();
if (validationStatus == ValidationStatus.VALID) {
LOG.debug("Enabling {} proceeding after performing
validation", serviceNode);
- } else if (completeExceptionallyOnFailure) {
- final Collection<ValidationResult> errors =
validationState.getValidationErrors();
- final String message = "Enabling %s failed: Validation
Status [%s] Errors %s".formatted(serviceNode, validationStatus, errors);
- future.completeExceptionally(new
IllegalStateException(message));
} else {
+ final Collection<ValidationResult> errors =
validationState.getValidationErrors();
+ if (completeExceptionallyOnFailure) {
+ future.completeExceptionally(new
IllegalStateException("Enabling %s failed: Validation Status [%s] Errors
%s".formatted(serviceNode, validationStatus, errors)));
+ }
+
final long selectedValidationDelay =
getDelay(validationDelay, INCREMENTAL_VALIDATION_DELAY_MS);
// Log warning on repeated validation rescheduling
if (selectedValidationDelay > MAXIMUM_DELAY.toMillis()) {
- final Collection<ValidationResult> errors =
validationState.getValidationErrors();
LOG.warn("Validation rescheduled in {} ms for {}
Errors {}", selectedValidationDelay, serviceNode, errors);
}
@@ -670,10 +670,8 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
scheduler.schedule(this, selectedValidationDelay,
TimeUnit.MILLISECONDS);
LOG.debug("Validation rescheduled in {} ms for {}",
selectedValidationDelay, serviceNode);
} catch (final RejectedExecutionException e) {
- LOG.error("Validation rescheduling rejected for {}",
serviceNode, e);
- final Collection<ValidationResult> errors =
validationState.getValidationErrors();
- final String message = "Enabling %s rejected:
Validation Status [%s] Errors %s".formatted(serviceNode, validationStatus,
errors);
- future.completeExceptionally(new
IllegalStateException(message));
+ LOG.debug("Validation rescheduling rejected for {}",
serviceNode, e);
+ future.completeExceptionally(new
IllegalStateException("Enabling %s rejected: Validation Status [%s] Errors
%s".formatted(serviceNode, validationStatus, errors)));
}
// Enable command rescheduled or rejected
return;
@@ -788,7 +786,6 @@ public class StandardControllerServiceNode extends
AbstractComponentNode impleme
return future;
}
-
private void invokeDisable(ConfigurationContext configContext) {
final ControllerService controllerService =
getControllerServiceImplementation();
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(getExtensionManager(),
controllerService.getClass(), getIdentifier())) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
index 35ff55a716..6950509df6 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/asset/AssetComponentManager.java
@@ -17,6 +17,8 @@
package org.apache.nifi.asset;
+import java.util.Collection;
+
/**
* Manages restarting components which reference a given Asset through a
Parameter.
*/
@@ -36,4 +38,12 @@ public interface AssetComponentManager {
*/
void restartReferencingComponents(Asset asset);
+ /**
+ * Synchronously restarts any components referencing any of the given
assets.
+ * Components are only restarted once even if they reference multiple
assets.
+ *
+ * @param assets the assets
+ */
+ void restartReferencingComponents(Collection<Asset> assets);
+
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
index 78b197cfb0..9954c947c1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetComponentManager.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -103,6 +104,63 @@ public class StandardAssetComponentManager implements
AssetComponentManager {
stoppedStatelessProcessGroups.forEach(ProcessGroup::startProcessing);
}
+ @Override
+ public void restartReferencingComponents(final Collection<Asset> assets) {
+ if (assets == null || assets.isEmpty()) {
+ return;
+ }
+
+ final ParameterContextManager parameterContextManager =
flowManager.getParameterContextManager();
+
+ // Collect unique components across all assets
+ final Set<ProcessorNode> allProcessorsReferencingParameters = new
HashSet<>();
+ final Set<ControllerServiceNode>
allControllerServicesReferencingParameters = new HashSet<>();
+ final Set<ProcessGroup> allAffectedStatelessGroups = new HashSet<>();
+
+ for (final Asset asset : assets) {
+ final ParameterContext parameterContext =
parameterContextManager.getParameterContext(asset.getParameterContextIdentifier());
+ if (parameterContext == null) {
+ logger.warn("Parameter context [{}] not found for asset [{}]",
asset.getParameterContextIdentifier(), asset.getName());
+ continue;
+ }
+
+ // Determine which parameters reference the given asset
+ final Set<Parameter> parametersReferencingAsset =
getParametersReferencingAsset(parameterContext, asset);
+ if (parametersReferencingAsset.isEmpty()) {
+ logger.info("Asset [{}] is not referenced by any parameters in
ParameterContext [{}]", asset.getName(), parameterContext.getIdentifier());
+ continue;
+ }
+
+ // Collect processors, controller services, and stateless PGs that
reference a parameter that references the asset
+
allProcessorsReferencingParameters.addAll(getProcessorsReferencingParameters(parameterContext,
parametersReferencingAsset));
+
allControllerServicesReferencingParameters.addAll(getControllerServicesReferencingParameters(parameterContext,
parametersReferencingAsset));
+
allAffectedStatelessGroups.addAll(getAffectedStatelessGroups(parameterContext));
+ }
+
+ logger.info("Found {} unique stateless groups, {} unique processors,
and {} unique controller services referencing {} assets",
+ allAffectedStatelessGroups.size(),
allProcessorsReferencingParameters.size(),
allControllerServicesReferencingParameters.size(), assets.size());
+
+ if (allProcessorsReferencingParameters.isEmpty() &&
allControllerServicesReferencingParameters.isEmpty() &&
allAffectedStatelessGroups.isEmpty()) {
+ logger.info("No components found referencing any of the {}
assets", assets.size());
+ return;
+ }
+
+ // Stop/disable the impacted components
+ final Set<ProcessGroup> stoppedStatelessProcessGroups = new
HashSet<>();
+ allAffectedStatelessGroups.forEach(processGroup ->
stopProcessGroup(processGroup, stoppedStatelessProcessGroups));
+
+ final Set<ProcessorNode> stoppedProcessors = new HashSet<>();
+ allProcessorsReferencingParameters.forEach(processorNode ->
stopProcessors(processorNode, stoppedProcessors));
+
+ final Set<ControllerServiceNode> disabledControllerServices = new
HashSet<>();
+
allControllerServicesReferencingParameters.forEach(controllerServiceNode ->
disableControllerService(controllerServiceNode, disabledControllerServices,
stoppedProcessors));
+
+ // Start/enable the components that were previously stopped/disabled
+ enableControllerServices(disabledControllerServices);
+ stoppedProcessors.forEach(this::startProcessor);
+ stoppedStatelessProcessGroups.forEach(ProcessGroup::startProcessing);
+ }
+
private void startProcessor(final ProcessorNode processorNode) {
try {
final Future<Void> future =
processorNode.getProcessGroup().startProcessor(processorNode, false);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
index 891edbabcf..d9f1cd44d3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
@@ -38,7 +38,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@@ -56,6 +58,7 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
private static final Duration SYNC_ASSET_RETRY_DURATION =
Duration.ofSeconds(30);
private final AssetManager assetManager;
+ private final AssetComponentManager assetComponentManager;
private final FlowManager flowManager;
private final ClusterCoordinator clusterCoordinator;
private final WebClientService webClientService;
@@ -64,8 +67,10 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
public StandardAssetSynchronizer(final FlowController flowController,
final ClusterCoordinator
clusterCoordinator,
final WebClientService webClientService,
- final NiFiProperties properties) {
+ final NiFiProperties properties,
+ final AssetComponentManager
assetComponentManager) {
this.assetManager = flowController.getAssetManager();
+ this.assetComponentManager = assetComponentManager;
this.flowManager = flowController.getFlowManager();
this.clusterCoordinator = clusterCoordinator;
this.webClientService = webClientService;
@@ -102,16 +107,30 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
final Set<ParameterContext> parameterContexts =
parameterContextManager.getParameterContexts();
logger.info("Found {} parameter contexts for synchronizing assets",
parameterContexts.size());
+ final List<Asset> allSyncedAssets = new ArrayList<>();
for (final ParameterContext parameterContext : parameterContexts) {
try {
- synchronize(assetsRestApiClient, parameterContext);
+ final List<Asset> syncedAssets =
synchronize(assetsRestApiClient, parameterContext);
+ allSyncedAssets.addAll(syncedAssets);
} catch (final Exception e) {
logger.error("Failed to synchronize assets for parameter
context [{}]", parameterContext.getIdentifier(), e);
}
}
+
+ // Restart components that reference the synchronized assets
+ if (!allSyncedAssets.isEmpty()) {
+ try {
+ logger.info("Restarting components referencing {} synchronized
assets", allSyncedAssets.size());
+
assetComponentManager.restartReferencingComponents(allSyncedAssets);
+ } catch (final Exception e) {
+ logger.error("Failed to restart components referencing
synchronized assets", e);
+ }
+ }
}
- private void synchronize(final AssetsRestApiClient assetsRestApiClient,
final ParameterContext parameterContext) {
+ private List<Asset> synchronize(final AssetsRestApiClient
assetsRestApiClient, final ParameterContext parameterContext) {
+ final List<Asset> syncedAssets = new ArrayList<>();
+
final Map<String, Asset> existingAssets =
parameterContext.getParameters().values().stream()
.map(Parameter::getReferencedAssets)
.flatMap(Collection::stream)
@@ -119,7 +138,7 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
if (existingAssets.isEmpty()) {
logger.info("Parameter context [{}] does not contain any assets to
synchronize", parameterContext.getIdentifier());
- return;
+ return syncedAssets;
}
logger.info("Parameter context [{}] has {} assets on the current
node", parameterContext.getIdentifier(), existingAssets.size());
@@ -127,13 +146,13 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
final AssetsEntity coordinatorAssetsEntity =
listAssetsWithRetry(assetsRestApiClient, parameterContext.getIdentifier());
if (coordinatorAssetsEntity == null) {
logger.error("Timeout listing assets from cluster coordinator for
parameter context [{}]", parameterContext.getIdentifier());
- return;
+ return syncedAssets;
}
final Collection<AssetEntity> coordinatorAssets =
coordinatorAssetsEntity.getAssets();
if (coordinatorAssets == null || coordinatorAssets.isEmpty()) {
logger.info("Parameter context [{}] did not return any assets from
the cluster coordinator", parameterContext.getIdentifier());
- return;
+ return syncedAssets;
}
logger.info("Parameter context [{}] returned {} assets from the
cluster coordinator", parameterContext.getIdentifier(),
coordinatorAssets.size());
@@ -142,30 +161,38 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
final AssetDTO coordinatorAsset =
coordinatorAssetEntity.getAsset();
final Asset matchingAsset =
existingAssets.get(coordinatorAsset.getId());
try {
- synchronize(assetsRestApiClient, parameterContext,
coordinatorAsset, matchingAsset);
+ final Asset syncedAsset = synchronize(assetsRestApiClient,
parameterContext, coordinatorAsset, matchingAsset);
+ if (syncedAsset != null) {
+ logger.info("Successfully synchronized asset
[id={},name={}] for parameter context [{}]",
+ syncedAsset.getIdentifier(),
syncedAsset.getName(), parameterContext.getIdentifier());
+ syncedAssets.add(syncedAsset);
+ }
} catch (final Exception e) {
logger.error("Failed to synchronize asset [id={},name={}] for
parameter context [{}]",
coordinatorAsset.getId(), coordinatorAsset.getName(),
parameterContext.getIdentifier(), e);
}
}
+
+ return syncedAssets;
}
- private void synchronize(final AssetsRestApiClient assetsRestApiClient,
final ParameterContext parameterContext, final AssetDTO coordinatorAsset, final
Asset matchingAsset) {
+ private Asset synchronize(final AssetsRestApiClient assetsRestApiClient,
final ParameterContext parameterContext, final AssetDTO coordinatorAsset, final
Asset matchingAsset) {
final String paramContextId = parameterContext.getIdentifier();
final String assetId = coordinatorAsset.getId();
final String assetName = coordinatorAsset.getName();
if (matchingAsset == null || !matchingAsset.getFile().exists()) {
logger.info("Synchronizing missing asset [id={},name={}] for
parameter context [{}]", assetId, assetName, paramContextId);
- synchronizeAssetWithRetry(assetsRestApiClient, paramContextId,
coordinatorAsset);
+ return synchronizeAssetWithRetry(assetsRestApiClient,
paramContextId, coordinatorAsset);
} else {
final String coordinatorAssetDigest = coordinatorAsset.getDigest();
final String matchingAssetDigest =
matchingAsset.getDigest().orElse(null);
if (!coordinatorAssetDigest.equals(matchingAssetDigest)) {
logger.info("Synchronizing asset [id={},name={}] with updated
digest [{}] for parameter context [{}]",
assetId, assetName, coordinatorAssetDigest,
paramContextId);
- synchronizeAssetWithRetry(assetsRestApiClient, paramContextId,
coordinatorAsset);
+ return synchronizeAssetWithRetry(assetsRestApiClient,
paramContextId, coordinatorAsset);
} else {
logger.info("Coordinator asset [id={},name={}] found for
parameter context [{}]: retrieval not required", assetId, assetName,
paramContextId);
+ return null;
}
}
}
@@ -193,19 +220,20 @@ public class StandardAssetSynchronizer implements
AssetSynchronizer {
}
}
- private void synchronizeAssetWithRetry(final AssetsRestApiClient
assetsRestApiClient, final String parameterContextId, final AssetDTO
coordinatorAsset) {
+ private Asset synchronizeAssetWithRetry(final AssetsRestApiClient
assetsRestApiClient, final String parameterContextId, final AssetDTO
coordinatorAsset) {
final Instant expirationTime =
Instant.ofEpochMilli(System.currentTimeMillis() +
SYNC_ASSET_RETRY_DURATION.toMillis());
while (System.currentTimeMillis() < expirationTime.toEpochMilli()) {
final Asset syncedAsset = synchronizeAsset(assetsRestApiClient,
parameterContextId, coordinatorAsset);
if (syncedAsset != null) {
logger.info("Synchronizing asset complete
[id={},name={},file={},digest={}]",
syncedAsset.getIdentifier(), syncedAsset.getName(),
syncedAsset.getFile().getAbsolutePath(),
syncedAsset.getDigest().orElse("[Missing]"));
- return;
+ return syncedAsset;
}
logger.info("Unable to synchronize asset [id={},name={}] for
parameter context [{}]: retrying until [{}]",
coordinatorAsset.getId(), coordinatorAsset.getName(),
parameterContextId, expirationTime);
sleep(Duration.ofSeconds(5));
}
+ return null;
}
private Asset synchronizeAsset(final AssetsRestApiClient
assetsRestApiClient, final String parameterContextId, final AssetDTO
coordinatorAsset) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
index 8e3bac3cbe..51071e5c3b 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/framework/configuration/FlowControllerConfiguration.java
@@ -464,7 +464,7 @@ public class FlowControllerConfiguration {
*/
@Bean
public AssetSynchronizer assetSynchronizer() throws Exception {
- return new StandardAssetSynchronizer(flowController(),
clusterCoordinator, webClientService(), properties);
+ return new StandardAssetSynchronizer(flowController(),
clusterCoordinator, webClientService(), properties, affectedComponentManager());
}
/**
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/ModifyClasspathService.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/ModifyClasspathService.java
new file mode 100644
index 0000000000..94a25b540f
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/java/org/apache/nifi/cs/tests/system/ModifyClasspathService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * A simple controller service that has a property with
dynamicallyModifiesClasspath(true).
+ * Used for testing asset synchronization with services that modify the
classpath.
+ */
+@RequiresInstanceClassLoading
+public class ModifyClasspathService extends AbstractControllerService {
+
+ public static final PropertyDescriptor RESOURCES = new
PropertyDescriptor.Builder()
+ .name("Resources")
+ .description("Resources to add to the classpath")
+ .required(false)
+ .dynamicallyModifiesClasspath(true)
+ .identifiesExternalResource(ResourceCardinality.MULTIPLE,
ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(RESOURCES);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final File resourceFile =
context.getProperty(RESOURCES).asResource().asFile();
+ if (!resourceFile.exists()) {
+ throw new IllegalStateException("Resource file does not exist");
+ }
+ }
+}
+
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index a4da9272ec..99b5498bfe 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -22,6 +22,7 @@ org.apache.nifi.cs.tests.system.StandardCountService
org.apache.nifi.cs.tests.system.StandardSleepService
org.apache.nifi.cs.tests.system.MigratePropertiesCountService
org.apache.nifi.cs.tests.system.MigrationService
+org.apache.nifi.cs.tests.system.ModifyClasspathService
org.apache.nifi.cs.tests.system.MockCSVReader
org.apache.nifi.cs.tests.system.MockCSVWriter
org.apache.nifi.cs.tests.system.VerifyLocalClusterStateService
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
index 95e8e8c33d..371c95dc9e 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
@@ -22,10 +22,14 @@ import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.AssetsEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextUpdateRequestEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -41,6 +45,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* Repeats all tests in ParameterContextIT but in a clustered mode
*/
public class ClusteredParameterContextIT extends ParameterContextIT {
+ private static final Logger logger =
LoggerFactory.getLogger(ClusteredParameterContextIT.class);
+
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
@@ -223,4 +229,89 @@ public class ClusteredParameterContextIT extends
ParameterContextIT {
getClientUtil().stopProcessor(generateFlowFile);
getClientUtil().waitForStoppedProcessor(generateFlowFile.getId());
}
+
+ @Test
+ public void testSynchronizeAssetsWithModifyClasspathService() throws
NiFiClientException, IOException, InterruptedException {
+ waitForAllNodesConnected();
+
+ // Create a parameter context with one parameter
+ final Map<String, String> paramValues = Map.of("classpathResource",
"");
+ final ParameterContextEntity paramContext =
getClientUtil().createParameterContext("testSynchronizeAssetsWithModifyClasspathService",
paramValues);
+
+ // Set the Parameter Context on the root Process Group
+ setParameterContext("root", paramContext);
+
+ // Create an asset and update the parameter to reference the asset
+ final File assetFile = new
File("src/test/resources/sample-assets/helloworld.txt");
+ final AssetEntity asset = createAsset(paramContext.getId(), assetFile);
+
+ final ParameterContextUpdateRequestEntity referenceAssetUpdateRequest
= getClientUtil().updateParameterAssetReferences(
+ paramContext, Map.of("classpathResource",
List.of(asset.getAsset().getId())));
+
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(),
referenceAssetUpdateRequest.getRequest().getRequestId());
+
+ // Stop node2 so we can set up the flow on node1 first
+ logger.info("Disconnecting node 2");
+ disconnectNode(2);
+ getNiFiInstance().getNodeInstance(2).stop();
+
+ // Delete node2's assets directory to ensure it's empty
+ final File node2Dir =
getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
+ final File node2AssetsDir = new File(node2Dir, "assets");
+ if (node2AssetsDir.exists()) {
+ FileUtils.deleteFilesInDir(node2AssetsDir, (dir, name) -> true,
null, true, true);
+ assertTrue(node2AssetsDir.delete());
+ }
+ assertFalse(node2AssetsDir.exists());
+
+ // Create an instance of ModifyClasspathService and use the parameter
as the value of the Resources property
+ final ControllerServiceEntity modifyClasspathService =
getClientUtil().createControllerService("ModifyClasspathService");
+
getClientUtil().updateControllerServiceProperties(modifyClasspathService,
Map.of("Resources", "#{classpathResource}"));
+
+ // Enable the service
+ final ControllerServiceEntity serviceToEnable =
getNifiClient().getControllerServicesClient().getControllerService(modifyClasspathService.getId());
+ getClientUtil().enableControllerService(serviceToEnable);
+
+ // Wait for the service to be enabled and verify its state
+ getClientUtil().waitForControllerServiceState("root", "ENABLED",
List.of(modifyClasspathService.getId()));
+ assertControllerServiceEnabled(modifyClasspathService.getId());
+
+ logger.info("Starting node 2...");
+
+ // Start node2
+ getNiFiInstance().getNodeInstance(2).start(true);
+ //reconnectNode(2);
+ waitForAllNodesConnected();
+
+ // Verify node2 asset directories are recreated
+ assertTrue(node2AssetsDir.exists());
+ final File node2ContextDir = new File(node2AssetsDir,
paramContext.getId());
+ assertTrue(node2ContextDir.exists());
+
+ final File[] node2AssetFiles = node2ContextDir.listFiles();
+ assertNotNull(node2AssetFiles);
+ assertEquals(1, node2AssetFiles.length);
+
+ logger.info("Verifying controller service is still ENABLED...");
+
+ // Verify the controller service is still ENABLED after node2 joins
+ getClientUtil().waitForControllerServiceState("root", "ENABLED",
List.of(modifyClasspathService.getId()));
+ assertControllerServiceEnabled(modifyClasspathService.getId());
+
+ logger.info("Starting clean up...");
+
+ // Clean up - disable the service
+ final ControllerServiceEntity serviceToDisable =
getNifiClient().getControllerServicesClient().getControllerService(modifyClasspathService.getId());
+ getClientUtil().disableControllerService(serviceToDisable);
+ getClientUtil().waitForControllerServiceState("root", "DISABLED",
List.of(modifyClasspathService.getId()));
+ }
+
+ private void assertControllerServiceEnabled(final String serviceId) throws
NiFiClientException, IOException {
+ final ControllerServicesEntity servicesEntity =
getNifiClient().getFlowClient().getControllerServices("root");
+ final ControllerServiceEntity service =
servicesEntity.getControllerServices().stream()
+ .filter(s -> s.getId().equals(serviceId))
+ .findFirst()
+ .orElse(null);
+ assertNotNull(service, "Controller service not found: " + serviceId);
+ assertEquals("ENABLED", service.getComponent().getState(), "Controller
service should be ENABLED");
+ }
}