This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d7ba6d871f NIFI-15551 Handle duplicate keys when collecting existing 
asset references during synchronization
d7ba6d871f is described below

commit d7ba6d871f27be6da9cc254cd1b1f04265e53b45
Author: Bryan Bende <[email protected]>
AuthorDate: Wed Feb 4 13:49:10 2026 -0500

    NIFI-15551 Handle duplicate keys when collecting existing asset references 
during synchronization
    
    This closes #10854.
    
    Signed-off-by: Pierre Villard <[email protected]>
---
 .../main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java  | 2 +-
 .../nifi/tests/system/parameters/ClusteredParameterContextIT.java   | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
index d9f1cd44d3..96b4f027cf 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/asset/StandardAssetSynchronizer.java
@@ -134,7 +134,7 @@ public class StandardAssetSynchronizer implements 
AssetSynchronizer {
         final Map<String, Asset> existingAssets = 
parameterContext.getParameters().values().stream()
                 .map(Parameter::getReferencedAssets)
                 .flatMap(Collection::stream)
-                .collect(Collectors.toMap(Asset::getIdentifier, 
Function.identity()));
+                .collect(Collectors.toMap(Asset::getIdentifier, 
Function.identity(), (existing, replacement) -> existing));
 
         if (existingAssets.isEmpty()) {
             logger.info("Parameter context [{}] does not contain any assets to 
synchronize", parameterContext.getIdentifier());
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
index 371c95dc9e..efe9017357 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ClusteredParameterContextIT.java
@@ -56,7 +56,7 @@ public class ClusteredParameterContextIT extends 
ParameterContextIT {
     public void testSynchronizeAssets() throws NiFiClientException, 
IOException, InterruptedException {
         waitForAllNodesConnected();
 
-        final Map<String, String> paramValues = Map.of("name", "foo", 
"fileToIngest", "");
+        final Map<String, String> paramValues = Map.of("name", "foo", 
"fileToIngest", "", "anotherFileToIngest", "");
         final ParameterContextEntity paramContext = 
getClientUtil().createParameterContext("testSynchronizeAssets", paramValues);
 
         // Set the Parameter Context on the root Process Group
@@ -67,12 +67,12 @@ public class ClusteredParameterContextIT extends 
ParameterContextIT {
         getClientUtil().updateProcessorProperties(ingest, Map.of("Filename", 
"#{fileToIngest}", "Delete File", "false"));
         getClientUtil().updateProcessorSchedulingPeriod(ingest, "10 mins");
 
-        // Create an asset and update the parameter to reference the asset
+        // Create an asset and update both parameters to reference the same 
asset
         final File assetFile = new 
File("src/test/resources/sample-assets/helloworld.txt");
         final AssetEntity asset = createAsset(paramContext.getId(), assetFile);
 
         final ParameterContextUpdateRequestEntity referenceAssetUpdateRequest 
= getClientUtil().updateParameterAssetReferences(
-                paramContext, Map.of("fileToIngest", 
List.of(asset.getAsset().getId())));
+                paramContext, Map.of("fileToIngest", 
List.of(asset.getAsset().getId()), "anotherFileToIngest", 
List.of(asset.getAsset().getId())));
         
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(), 
referenceAssetUpdateRequest.getRequest().getRequestId());
 
         final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");

Reply via email to