This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new de17d06866 fix(#3833): Add handling for duplicate keys (#3834)
de17d06866 is described below

commit de17d06866483c9400f7721528384c9ec767d700
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Oct 15 08:47:54 2025 +0200

    fix(#3833): Add handling for duplicate keys (#3834)
---
 .../manager/preview/PipelinePreview.java           | 71 +++++++++++++---------
 1 file changed, 43 insertions(+), 28 deletions(-)

diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 3ec765196f..cba69b73c5 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -73,33 +73,50 @@ public class PipelinePreview {
   }
 
   public Map<String, SpDataStream> getPipelineElementPreviewStreams(String 
previewId) throws IllegalArgumentException {
+
     return ActivePipelinePreviews
         .INSTANCE
         .getInvocationGraphs(previewId)
         .stream()
-        .filter(graph -> graph instanceof DataProcessorInvocation || graph 
instanceof SpDataStream)
+        .filter(this::isProcessorOrStream)
         .collect(Collectors.toMap(
             NamedStreamPipesEntity::getElementId,
-            graph -> {
-              if (graph instanceof DataProcessorInvocation) {
-                return ((DataProcessorInvocation) graph).getOutputStream();
-              } else {
-                return (SpDataStream) graph;
-              }
-            }
+            this::extractStreamFromElement,
+            (existing, replacement) -> existing // keep the first stream in 
case of duplicate keys
         ));
   }
 
-  private void rewriteElementIds(List<NamedStreamPipesEntity> pipelineElements,
-                                 Map<String, String> elementIdMappings) {
+  private boolean isProcessorOrStream(NamedStreamPipesEntity pe) {
+    return pe instanceof DataProcessorInvocation || pe instanceof SpDataStream;
+  }
+
+  private SpDataStream extractStreamFromElement(NamedStreamPipesEntity 
element) {
+    if (element instanceof DataProcessorInvocation) {
+      return ((DataProcessorInvocation) element).getOutputStream();
+    } else if (element instanceof SpDataStream) {
+      return (SpDataStream) element;
+    } else {
+      throw new IllegalArgumentException("Unsupported graph type: " + 
element.getClass()
+                                                                             
.getSimpleName());
+    }
+
+  }
+
+  private void rewriteElementIds(
+      List<NamedStreamPipesEntity> pipelineElements,
+      Map<String, String> elementIdMappings
+  ) {
     pipelineElements
         .forEach(pe -> {
           if (pe instanceof DataProcessorInvocation) {
             var originalElementId = pe.getElementId();
-            var newElementId = (String.format(
-                "%s:%s",
-                StringUtils.substringBeforeLast(pe.getElementId(), ":"),
-                RandomStringUtils.randomAlphanumeric(5)));
+            var newElementId = (
+                String.format(
+                    "%s:%s",
+                    StringUtils.substringBeforeLast(pe.getElementId(), ":"),
+                    RandomStringUtils.randomAlphanumeric(5)
+                )
+            );
             pe.setElementId(newElementId);
             elementIdMappings.put(originalElementId, newElementId);
           } else {
@@ -138,17 +155,22 @@ public class PipelinePreview {
     ActivePipelinePreviews.INSTANCE.removePreview(previewId);
   }
 
-  private void storeGraphs(String previewId,
-                           List<NamedStreamPipesEntity> graphs) {
+  private void storeGraphs(
+      String previewId,
+      List<NamedStreamPipesEntity> graphs
+  ) {
     ActivePipelinePreviews.INSTANCE.addActivePreview(previewId, graphs);
   }
 
   private String generatePreviewId() {
-    return UUID.randomUUID().toString();
+    return UUID.randomUUID()
+               .toString();
   }
 
-  private PipelinePreviewModel makePreviewModel(String previewId,
-                                                Map<String, String> 
elementIdMappings) {
+  private PipelinePreviewModel makePreviewModel(
+      String previewId,
+      Map<String, String> elementIdMappings
+  ) {
     PipelinePreviewModel previewModel = new PipelinePreviewModel();
     previewModel.setPreviewId(previewId);
     previewModel.setElementIdMappings(elementIdMappings);
@@ -156,18 +178,11 @@ public class PipelinePreview {
     return previewModel;
   }
 
-  private List<String> collectElementIds(List<NamedStreamPipesEntity> graphs) {
-    return graphs
-        .stream()
-        .map(NamedStreamPipesEntity::getElementId)
-        .collect(Collectors.toList());
-  }
-
   private List<InvocableStreamPipesEntity> filter(List<NamedStreamPipesEntity> 
graphs) {
     List<InvocableStreamPipesEntity> dataProcessors = new ArrayList<>();
     graphs.stream()
-        .filter(g -> g instanceof DataProcessorInvocation)
-        .forEach(p -> dataProcessors.add((DataProcessorInvocation) p));
+          .filter(g -> g instanceof DataProcessorInvocation)
+          .forEach(p -> dataProcessors.add((DataProcessorInvocation) p));
 
     return dataProcessors;
   }

Reply via email to