This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 02b60f7692 fix: Assign new elementIds in pipeline preview (#3211)
02b60f7692 is described below
commit 02b60f7692d7e3aa5a0c8a80a00e22d56b8a4b6a
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Sep 3 17:37:23 2024 +0200
fix: Assign new elementIds in pipeline preview (#3211)
---
.../model/preview/PipelinePreviewModel.java | 14 ++++++-----
.../matching/PipelineModificationGenerator.java | 8 +++---
.../matching/PipelineVerificationHandlerV2.java | 4 ++-
.../pipeline/AbstractPipelineValidationStep.java | 10 +-------
...Validator.java => IPipelineValidationStep.java} | 20 ++++-----------
.../v2/pipeline/PipelineValidationSteps.java | 2 +-
.../matching/v2/pipeline/PipelineValidator.java | 11 ++++----
.../manager/preview/PipelinePreview.java | 29 +++++++++++++++++++---
.../src/lib/model/gen/streampipes-model.ts | 10 ++++----
.../pipeline-assembly-drawing-area.component.ts | 4 ++-
.../pipeline-element-preview.component.ts | 8 ++++--
.../dropped-pipeline-element.component.html | 6 ++---
ui/src/app/services/live-preview.service.ts | 8 ++++--
13 files changed, 78 insertions(+), 56 deletions(-)
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java
index b63db03d75..021c6d428f 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/preview/PipelinePreviewModel.java
@@ -19,16 +19,18 @@ package org.apache.streampipes.model.preview;
import org.apache.streampipes.model.shared.annotation.TsModel;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
@TsModel
public class PipelinePreviewModel {
private String previewId;
- private List<String> supportedPipelineElementDomIds;
+ private Map<String, String> elementIdMappings;
public PipelinePreviewModel() {
+ this.elementIdMappings = new HashMap<>();
}
public String getPreviewId() {
@@ -39,11 +41,11 @@ public class PipelinePreviewModel {
this.previewId = previewId;
}
- public List<String> getSupportedPipelineElementDomIds() {
- return supportedPipelineElementDomIds;
+ public Map<String, String> getElementIdMappings() {
+ return elementIdMappings;
}
- public void setSupportedPipelineElementDomIds(List<String>
supportedPipelineElementDomIds) {
- this.supportedPipelineElementDomIds = supportedPipelineElementDomIds;
+ public void setElementIdMappings(Map<String, String> elementIdMappings) {
+ this.elementIdMappings = elementIdMappings;
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index cbfa7ecfaa..362833f842 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.manager.matching;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphHelpers;
+import
org.apache.streampipes.manager.matching.v2.pipeline.IPipelineValidationStep;
import org.apache.streampipes.manager.matching.v2.pipeline.PipelineValidator;
import
org.apache.streampipes.manager.matching.v2.pipeline.SpValidationException;
import org.apache.streampipes.model.SpDataStream;
@@ -49,10 +50,11 @@ public class PipelineModificationGenerator {
private final Map<String, PipelineEdgeValidation> edgeValidations;
private final PipelineValidator pipelineValidator;
- public PipelineModificationGenerator(PipelineGraph pipelineGraph) {
+ public PipelineModificationGenerator(PipelineGraph pipelineGraph,
+ List<IPipelineValidationStep> steps) {
this.pipelineGraph = pipelineGraph;
this.pipelineModifications = new HashMap<>();
- this.pipelineValidator = new PipelineValidator();
+ this.pipelineValidator = new PipelineValidator(steps);
this.edgeValidations = new HashMap<>();
}
@@ -147,6 +149,6 @@ public class PipelineModificationGenerator {
return matchingResultMessages
.stream()
.map(m -> new Notification(m.getTitle(), m.toString()))
- .collect(Collectors.toList());
+ .toList();
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index ac3bed9550..a00a5c43b0 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.manager.matching;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphBuilder;
+import
org.apache.streampipes.manager.matching.v2.pipeline.PipelineValidationSteps;
import org.apache.streampipes.manager.recommender.AllElementsProvider;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.base.NamedStreamPipesEntity;
@@ -44,7 +45,8 @@ public class PipelineVerificationHandlerV2 {
public PipelineModificationMessage verifyPipeline() {
PipelineGraph graph = new PipelineGraphBuilder(pipeline).buildGraph();
- return new
PipelineModificationGenerator(graph).buildPipelineModificationMessage();
+ var steps = new PipelineValidationSteps().collect();
+ return new PipelineModificationGenerator(graph,
steps).buildPipelineModificationMessage();
}
public Pipeline makeModifiedPipeline() {
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java
index 9043d495ba..7af64b589c 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/AbstractPipelineValidationStep.java
@@ -19,26 +19,18 @@
package org.apache.streampipes.manager.matching.v2.pipeline;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
import org.apache.streampipes.model.client.matching.MatchingResultMessage;
-import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-public abstract class AbstractPipelineValidationStep {
+public abstract class AbstractPipelineValidationStep implements
IPipelineValidationStep {
protected final Map<String, Integer> visitorHistory = new HashMap<>();
- public abstract void apply(NamedStreamPipesEntity source,
- InvocableStreamPipesEntity target,
- Set<InvocableStreamPipesEntity> allTargets,
- List<PipelineElementValidationInfo>
validationInfos) throws SpValidationException;
-
public List<MatchingResultMessage> getNewErrorLog() {
return new ArrayList<>();
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java
similarity index 65%
copy from
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
copy to
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java
index 6648082f86..ae8ed493d9 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/IPipelineValidationStep.java
@@ -25,20 +25,10 @@ import
org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import java.util.List;
import java.util.Set;
-public class PipelineValidator {
+public interface IPipelineValidationStep {
- private final List<AbstractPipelineValidationStep> steps;
-
- public PipelineValidator() {
- this.steps = new PipelineValidationSteps().collect();
- }
-
- public void apply(NamedStreamPipesEntity source,
- InvocableStreamPipesEntity target,
- Set<InvocableStreamPipesEntity> allTargets,
- List<PipelineElementValidationInfo> validationInfos)
throws SpValidationException {
- for (AbstractPipelineValidationStep step : steps) {
- step.apply(source, target, allTargets, validationInfos);
- }
- }
+ void apply(NamedStreamPipesEntity source,
+ InvocableStreamPipesEntity target,
+ Set<InvocableStreamPipesEntity> allTargets,
+ List<PipelineElementValidationInfo> validationInfos) throws
SpValidationException;
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java
index 4ead31cbab..0e78372e46 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidationSteps.java
@@ -23,7 +23,7 @@ import java.util.List;
public class PipelineValidationSteps {
- public List<AbstractPipelineValidationStep> collect() {
+ public List<IPipelineValidationStep> collect() {
return Arrays.asList(
new PrepareStep(),
new ApplyGroundingStep(),
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
index 6648082f86..439330c535 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/PipelineValidator.java
@@ -25,19 +25,20 @@ import
org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import java.util.List;
import java.util.Set;
-public class PipelineValidator {
+public class PipelineValidator implements IPipelineValidationStep {
- private final List<AbstractPipelineValidationStep> steps;
+ private final List<IPipelineValidationStep> steps;
- public PipelineValidator() {
- this.steps = new PipelineValidationSteps().collect();
+ public PipelineValidator(List<IPipelineValidationStep> steps) {
+ this.steps = steps;
}
+ @Override
public void apply(NamedStreamPipesEntity source,
InvocableStreamPipesEntity target,
Set<InvocableStreamPipesEntity> allTargets,
List<PipelineElementValidationInfo> validationInfos)
throws SpValidationException {
- for (AbstractPipelineValidationStep step : steps) {
+ for (var step : steps) {
step.apply(source, target, allTargets, validationInfos);
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 6795d6f67c..0e6b22bdc6 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -30,10 +30,13 @@ import
org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.preview.PipelinePreviewModel;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -45,18 +48,20 @@ public class PipelinePreview {
public PipelinePreviewModel initiatePreview(Pipeline pipeline) {
String previewId = generatePreviewId();
+ var elementIdMappings = new HashMap<String, String>();
pipeline.setActions(new ArrayList<>());
List<NamedStreamPipesEntity> pipelineElements = new ArrayList<>(
new PipelineVerificationHandlerV2(pipeline)
.verifyAndBuildGraphs(true)
);
+ rewriteElementIds(pipelineElements, elementIdMappings);
invokeGraphs(filter(pipelineElements));
storeGraphs(previewId, pipelineElements);
LOG.info("Preview pipeline {} started", previewId);
- return makePreviewModel(previewId, pipelineElements);
+ return makePreviewModel(previewId, elementIdMappings);
}
public void deletePreview(String previewId) {
@@ -84,6 +89,24 @@ public class PipelinePreview {
));
}
+ private void rewriteElementIds(List<NamedStreamPipesEntity> pipelineElements,
+ Map<String, String> elementIdMappings) {
+ pipelineElements
+ .forEach(pe -> {
+ if (pe instanceof DataProcessorInvocation) {
+ var originalElementId = pe.getElementId();
+ var newElementId = (String.format(
+ "%s:%s",
+ StringUtils.substringBeforeLast(pe.getElementId(), ":"),
+ RandomStringUtils.randomAlphanumeric(5)));
+ pe.setElementId(newElementId);
+ elementIdMappings.put(originalElementId, newElementId);
+ } else {
+ elementIdMappings.put(pe.getElementId(), pe.getElementId());
+ }
+ });
+ }
+
private String findSelectedEndpoint(InvocableStreamPipesEntity g) throws
NoServiceEndpointsAvailableException {
return new ExtensionsServiceEndpointGenerator()
.getEndpointResourceUrl(
@@ -124,10 +147,10 @@ public class PipelinePreview {
}
private PipelinePreviewModel makePreviewModel(String previewId,
- List<NamedStreamPipesEntity>
graphs) {
+ Map<String, String>
elementIdMappings) {
PipelinePreviewModel previewModel = new PipelinePreviewModel();
previewModel.setPreviewId(previewId);
- previewModel.setSupportedPipelineElementDomIds(collectElementIds(graphs));
+ previewModel.setElementIdMappings(elementIdMappings);
return previewModel;
}
diff --git
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 6edbaea91e..e03d42804b 100644
---
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -20,7 +20,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 3.2.1263 on 2024-08-28
16:22:24.
+// Generated using typescript-generator version 3.2.1263 on 2024-09-03
16:16:50.
export class NamedStreamPipesEntity implements Storable {
'@class':
@@ -2867,8 +2867,8 @@ export class PipelineOperationStatus {
}
export class PipelinePreviewModel {
+ elementIdMappings: { [index: string]: string };
previewId: string;
- supportedPipelineElementDomIds: string[];
static fromData(
data: PipelinePreviewModel,
@@ -2878,10 +2878,10 @@ export class PipelinePreviewModel {
return data;
}
const instance = target || new PipelinePreviewModel();
+ instance.elementIdMappings = __getCopyObjectFn(__identity<string>())(
+ data.elementIdMappings,
+ );
instance.previewId = data.previewId;
- instance.supportedPipelineElementDomIds = __getCopyArrayFn(
- __identity<string>(),
- )(data.supportedPipelineElementDomIds);
return instance;
}
}
diff --git
a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts
b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts
index c461f67e10..8a0d66bc51 100644
---
a/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts
+++
b/ui/src/app/editor/components/pipeline-assembly/pipeline-assembly-drawing-area/pipeline-assembly-drawing-area.component.ts
@@ -160,7 +160,9 @@ export class PipelineAssemblyDrawingAreaComponent
implements OnInit {
const data = this.livePreviewService.convert(
res as HttpDownloadProgressEvent,
);
- this.livePreviewService.eventSub.next(data);
+ if (data) {
+ this.livePreviewService.eventSub.next(data);
+ }
});
});
} else {
diff --git
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
index 1e0d949ff8..821f5b1a7b 100644
---
a/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
+++
b/ui/src/app/editor/components/pipeline-element-preview/pipeline-element-preview.component.ts
@@ -20,6 +20,7 @@ import { Component, Input, OnDestroy, OnInit } from
'@angular/core';
import { Subscription } from 'rxjs';
import { KeyValue } from '@angular/common';
import { LivePreviewService } from '../../../services/live-preview.service';
+import { PipelinePreviewModel } from '@streampipes/platform-services';
@Component({
selector: 'sp-pipeline-element-preview',
@@ -28,7 +29,7 @@ import { LivePreviewService } from
'../../../services/live-preview.service';
})
export class PipelineElementPreviewComponent implements OnInit, OnDestroy {
@Input()
- previewId: string;
+ pipelinePreview: PipelinePreviewModel;
@Input()
elementId: string;
@@ -53,7 +54,10 @@ export class PipelineElementPreviewComponent implements
OnInit, OnDestroy {
getLatestRuntimeInfo() {
this.previewSub = this.livePreviewService.eventSub.subscribe(event => {
if (event) {
- this.runtimeData = event[this.elementId];
+ this.runtimeData =
+ event[
+ this.pipelinePreview.elementIdMappings[this.elementId]
+ ];
} else {
this.runtimeDataError = true;
}
diff --git
a/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html
b/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html
index 6cf404f42d..e32ce03c46 100644
---
a/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html
+++
b/ui/src/app/editor/components/pipeline/dropped-pipeline-element/dropped-pipeline-element.component.html
@@ -96,11 +96,11 @@
<sp-pipeline-element-preview
*ngIf="
previewModeActive &&
- pipelinePreview.supportedPipelineElementDomIds.indexOf(
+ pipelinePreview.elementIdMappings[
pipelineElementConfig.payload.elementId
- ) > -1
+ ] !== undefined
"
- [previewId]="pipelinePreview.previewId"
+ [pipelinePreview]="pipelinePreview"
[elementId]="pipelineElementConfig.payload.elementId"
>
</sp-pipeline-element-preview>
diff --git a/ui/src/app/services/live-preview.service.ts
b/ui/src/app/services/live-preview.service.ts
index 8f6ccb39fd..53c0558fad 100644
--- a/ui/src/app/services/live-preview.service.ts
+++ b/ui/src/app/services/live-preview.service.ts
@@ -28,7 +28,11 @@ export class LivePreviewService {
convert(event: HttpDownloadProgressEvent) {
const { partialText } = event;
- const chunks = partialText.split('\n');
- return JSON.parse(chunks[chunks.length - 2]);
+ if (partialText) {
+ const chunks = partialText.split('\n');
+ return JSON.parse(chunks[chunks.length - 2]);
+ } else {
+ return undefined;
+ }
}
}