This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3833-pipeline-live-preview-fails-when-the-same-data-stream-is-selected-twice
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3833-pipeline-live-preview-fails-when-the-same-data-stream-is-selected-twice
by this push:
new 7061a61c48 fix(#3833): Add handling for duplicate keys
7061a61c48 is described below
commit 7061a61c48c280917d9a3f9926591607c43e9bc4
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Oct 14 14:26:54 2025 +0200
fix(#3833): Add handling for duplicate keys
---
.../manager/preview/PipelinePreview.java | 71 +++++++++++++---------
1 file changed, 43 insertions(+), 28 deletions(-)
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 3ec765196f..cba69b73c5 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -73,33 +73,50 @@ public class PipelinePreview {
}
public Map<String, SpDataStream> getPipelineElementPreviewStreams(String
previewId) throws IllegalArgumentException {
+
return ActivePipelinePreviews
.INSTANCE
.getInvocationGraphs(previewId)
.stream()
- .filter(graph -> graph instanceof DataProcessorInvocation || graph
instanceof SpDataStream)
+ .filter(this::isProcessorOrStream)
.collect(Collectors.toMap(
NamedStreamPipesEntity::getElementId,
- graph -> {
- if (graph instanceof DataProcessorInvocation) {
- return ((DataProcessorInvocation) graph).getOutputStream();
- } else {
- return (SpDataStream) graph;
- }
- }
+ this::extractStreamFromElement,
+ (existing, replacement) -> existing // keep the first stream in
case of duplicate keys
));
}
- private void rewriteElementIds(List<NamedStreamPipesEntity> pipelineElements,
- Map<String, String> elementIdMappings) {
+ private boolean isProcessorOrStream(NamedStreamPipesEntity pe) {
+ return pe instanceof DataProcessorInvocation || pe instanceof SpDataStream;
+ }
+
+ private SpDataStream extractStreamFromElement(NamedStreamPipesEntity
element) {
+ if (element instanceof DataProcessorInvocation) {
+ return ((DataProcessorInvocation) element).getOutputStream();
+ } else if (element instanceof SpDataStream) {
+ return (SpDataStream) element;
+ } else {
+ throw new IllegalArgumentException("Unsupported graph type: " +
element.getClass()
+
.getSimpleName());
+ }
+
+ }
+
+ private void rewriteElementIds(
+ List<NamedStreamPipesEntity> pipelineElements,
+ Map<String, String> elementIdMappings
+ ) {
pipelineElements
.forEach(pe -> {
if (pe instanceof DataProcessorInvocation) {
var originalElementId = pe.getElementId();
- var newElementId = (String.format(
- "%s:%s",
- StringUtils.substringBeforeLast(pe.getElementId(), ":"),
- RandomStringUtils.randomAlphanumeric(5)));
+ var newElementId = (
+ String.format(
+ "%s:%s",
+ StringUtils.substringBeforeLast(pe.getElementId(), ":"),
+ RandomStringUtils.randomAlphanumeric(5)
+ )
+ );
pe.setElementId(newElementId);
elementIdMappings.put(originalElementId, newElementId);
} else {
@@ -138,17 +155,22 @@ public class PipelinePreview {
ActivePipelinePreviews.INSTANCE.removePreview(previewId);
}
- private void storeGraphs(String previewId,
- List<NamedStreamPipesEntity> graphs) {
+ private void storeGraphs(
+ String previewId,
+ List<NamedStreamPipesEntity> graphs
+ ) {
ActivePipelinePreviews.INSTANCE.addActivePreview(previewId, graphs);
}
private String generatePreviewId() {
- return UUID.randomUUID().toString();
+ return UUID.randomUUID()
+ .toString();
}
- private PipelinePreviewModel makePreviewModel(String previewId,
- Map<String, String>
elementIdMappings) {
+ private PipelinePreviewModel makePreviewModel(
+ String previewId,
+ Map<String, String> elementIdMappings
+ ) {
PipelinePreviewModel previewModel = new PipelinePreviewModel();
previewModel.setPreviewId(previewId);
previewModel.setElementIdMappings(elementIdMappings);
@@ -156,18 +178,11 @@ public class PipelinePreview {
return previewModel;
}
- private List<String> collectElementIds(List<NamedStreamPipesEntity> graphs) {
- return graphs
- .stream()
- .map(NamedStreamPipesEntity::getElementId)
- .collect(Collectors.toList());
- }
-
private List<InvocableStreamPipesEntity> filter(List<NamedStreamPipesEntity>
graphs) {
List<InvocableStreamPipesEntity> dataProcessors = new ArrayList<>();
graphs.stream()
- .filter(g -> g instanceof DataProcessorInvocation)
- .forEach(p -> dataProcessors.add((DataProcessorInvocation) p));
+ .filter(g -> g instanceof DataProcessorInvocation)
+ .forEach(p -> dataProcessors.add((DataProcessorInvocation) p));
return dataProcessors;
}