This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new dd68567334 feat(#3266): Add API to programmatically create pipelines
(#3267)
dd68567334 is described below
commit dd685673344fd19926f75d6232df097992ab7be1
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Oct 4 16:03:21 2024 +0200
feat(#3266): Add API to programmatically create pipelines (#3267)
* feat(#3266): Add API to programmatically create pipelines
* Fix checkstyle
* Fix validation
---
.../management/AdapterUpdateManagement.java | 2 +-
.../ExtendedPipelineElementValidationInfo.java} | 24 ++++-
.../model/pipeline/PipelineModification.java | 17 +---
.../PipelineModificationResult.java} | 12 ++-
.../PipelineVerificationResult.java} | 10 ++-
.../compact/CompactPipeline.java} | 14 ++-
.../compact/CompactPipelineElement.java} | 10 ++-
.../compact/CreateOptions.java} | 7 +-
.../DefaultStaticPropertyVisitor.java | 10 +++
.../matching/PipelineModificationGenerator.java | 7 +-
.../matching/PipelineVerificationHandlerV2.java | 32 +++++--
.../v2/pipeline/CheckCompletedVisitor.java | 70 +++++++++++++--
.../compact/CompactPipelineManagement.java | 51 +++++++++++
.../generation/CompactPipelineGenerator.java | 11 ++-
.../DataProcessorPipelineElementGenerator.java | 41 +++++++++
.../DataSinkPipelineElementGenerator.java | 40 +++++++++
.../DataStreamPipelineElementGenerator.java | 14 ++-
.../InvocablePipelineElementGenerator.java | 51 +++++++++++
.../PipelineElementConfigurationStep.java | 74 +++++++++++++++
.../manager/preview/PipelinePreview.java | 1 +
.../rest/impl/CompactPipelineResource.java | 100 +++++++++++++++++++++
21 files changed, 540 insertions(+), 58 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 9aae62888d..b64b661495 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -84,7 +84,7 @@ public class AdapterUpdateManagement {
try {
var modificationMessage = new
PipelineVerificationHandlerV2(pipeline).verifyPipeline();
var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
- var modifiedPipeline = new
PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
+ var modifiedPipeline = new
PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline().pipeline();
var canAutoMigrate = canAutoMigrate(modificationMessage);
if (!canAutoMigrate) {
modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
similarity index 51%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
index 9285ae1878..9289b3de12 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
@@ -16,8 +16,26 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
-import java.util.Map;
+public class ExtendedPipelineElementValidationInfo extends
PipelineElementValidationInfo {
-public record CompactConfiguration(Map<String, Object> values) {}
+ private final String pipelineElementName;
+ private final String pipelineElementId;
+
+ public ExtendedPipelineElementValidationInfo(String pipelineElementName,
+ String pipelineElementId,
+ PipelineElementValidationInfo
info) {
+ super(info.getLevel(), info.getMessage());
+ this.pipelineElementName = pipelineElementName;
+ this.pipelineElementId = pipelineElementId;
+ }
+
+ public String getPipelineElementName() {
+ return pipelineElementName;
+ }
+
+ public String getPipelineElementId() {
+ return pipelineElementId;
+ }
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
index fc2ce9c6d3..585598890a 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
@@ -37,19 +37,8 @@ public class PipelineModification {
private List<SpDataStream> inputStreams;
private SpDataStream outputStream;
- public PipelineModification(String domId, String elementId,
- List<StaticProperty> staticProperties) {
- super();
- this.domId = domId;
- this.elementId = elementId;
- this.staticProperties = staticProperties;
- this.inputStreams = new ArrayList<>();
- this.outputStrategies = new ArrayList<>();
- this.validationInfos = new ArrayList<>();
- }
-
public PipelineModification() {
-
+ validationInfos = new ArrayList<>();
}
public String getDomId() {
@@ -92,10 +81,6 @@ public class PipelineModification {
this.inputStreams = inputStreams;
}
- public void addInputStream(SpDataStream inputStream) {
- this.inputStreams.add(inputStream);
- }
-
public boolean isPipelineElementValid() {
return pipelineElementValid;
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
similarity index 68%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
index 9285ae1878..9e6a1d8c74 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
@@ -16,8 +16,14 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
-import java.util.Map;
+import java.util.List;
-public record CompactConfiguration(Map<String, Object> values) {}
+public record PipelineModificationResult(Pipeline pipeline,
+
List<ExtendedPipelineElementValidationInfo> validationInfos) {
+
+ public boolean allPipelineElementsValid() {
+ return validationInfos.stream().noneMatch(v -> v.getLevel() ==
ValidationInfoLevel.ERROR);
+ }
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
similarity index 70%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
index 9285ae1878..7e730b2832 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
@@ -16,8 +16,12 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
-import java.util.Map;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-public record CompactConfiguration(Map<String, Object> values) {}
+import java.util.List;
+
+public record
PipelineVerificationResult(List<ExtendedPipelineElementValidationInfo>
validationInfos,
+ List<NamedStreamPipesEntity>
modifiedPipelineElements) {
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
similarity index 71%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
index 9285ae1878..59948aeb03 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
@@ -16,8 +16,16 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
-import java.util.Map;
+import org.apache.streampipes.model.connect.adapter.compact.CreateOptions;
-public record CompactConfiguration(Map<String, Object> values) {}
+import java.util.List;
+
+public record CompactPipeline(
+ String id,
+ String name,
+ String description,
+ List<CompactPipelineElement> pipelineElements,
+ CreateOptions createOptions
+) {}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
similarity index 69%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
index 9285ae1878..0f976c8145 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
@@ -16,8 +16,14 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
+import java.util.List;
import java.util.Map;
-public record CompactConfiguration(Map<String, Object> values) {}
+public record CompactPipelineElement(String type,
+ String ref,
+ String id,
+ List<String> connectedTo,
+ List<Map<String, Object>> configuration) {
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
similarity index 84%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
index 9285ae1878..bcf4952573 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
@@ -16,8 +16,7 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
-import java.util.Map;
-
-public record CompactConfiguration(Map<String, Object> values) {}
+public record CreateOptions(boolean start) {
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
index 5e898c36e8..7a9400b17a 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
@@ -20,6 +20,16 @@ package org.apache.streampipes.model.staticproperty;
public abstract class DefaultStaticPropertyVisitor implements
StaticPropertyVisitor {
+ protected boolean ignoreValidation;
+
+ public DefaultStaticPropertyVisitor(boolean ignoreValidation) {
+ this.ignoreValidation = ignoreValidation;
+ }
+
+ public DefaultStaticPropertyVisitor() {
+ this(false);
+ }
+
@Override
public void visit(CollectionStaticProperty collectionStaticProperty) {
collectionStaticProperty.getStaticPropertyTemplate().accept(this);
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index 362833f842..823d7c7714 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -34,6 +34,7 @@ import
org.apache.streampipes.model.message.PipelineEdgeValidation;
import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import org.apache.streampipes.model.pipeline.PipelineModification;
+import org.apache.streampipes.model.pipeline.ValidationInfoLevel;
import java.util.ArrayList;
import java.util.Collections;
@@ -99,7 +100,7 @@ public class PipelineModificationGenerator {
modification.setElementId(t.getElementId());
try {
pipelineValidator.apply(source, t, targets, validationInfos);
- buildModification(modification, t, t.getInputStreams(), true);
+ buildModification(modification, t, t.getInputStreams(),
!hasValidationError(modification));
edgeValidations.put(makeKey(source, t),
PipelineEdgeValidation.complete(source.getDom(), t.getDom()));
} catch (SpValidationException e) {
e.getErrorLog().forEach(log ->
validationInfos.add(PipelineElementValidationInfo.error(log.toString())));
@@ -114,6 +115,10 @@ public class PipelineModificationGenerator {
});
}
+ private boolean hasValidationError(PipelineModification modification) {
+ return modification.getValidationInfos().stream().anyMatch(v ->
v.getLevel() == ValidationInfoLevel.ERROR);
+ }
+
private String makeKey(NamedStreamPipesEntity source,
InvocableStreamPipesEntity t) {
return source.getDom() + "-" + t.getDom();
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index a00a5c43b0..3275f95b98 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -27,8 +27,11 @@ import
org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.message.PipelineModificationMessage;
+import
org.apache.streampipes.model.pipeline.ExtendedPipelineElementValidationInfo;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineModification;
+import org.apache.streampipes.model.pipeline.PipelineModificationResult;
+import org.apache.streampipes.model.pipeline.PipelineVerificationResult;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,11 +52,12 @@ public class PipelineVerificationHandlerV2 {
return new PipelineModificationGenerator(graph,
steps).buildPipelineModificationMessage();
}
- public Pipeline makeModifiedPipeline() {
- var allElements = verifyAndBuildGraphs(false);
+ public PipelineModificationResult makeModifiedPipeline() {
+ var result = verifyAndBuildGraphs(false);
+ var allElements = result.modifiedPipelineElements();
pipeline.setSepas(filterAndConvert(allElements,
DataProcessorInvocation.class));
pipeline.setActions(filterAndConvert(allElements,
DataSinkInvocation.class));
- return pipeline;
+ return new PipelineModificationResult(pipeline, result.validationInfos());
}
private <T extends InvocableStreamPipesEntity> List<T>
filterAndConvert(List<NamedStreamPipesEntity> elements,
@@ -65,10 +69,11 @@ public class PipelineVerificationHandlerV2 {
.toList();
}
- public List<NamedStreamPipesEntity> verifyAndBuildGraphs(boolean
ignoreUnconfigured) {
+ public PipelineVerificationResult verifyAndBuildGraphs(boolean
ignoreUnconfigured) {
var pipelineModifications = verifyPipeline().getPipelineModifications();
var allElements = new AllElementsProvider(pipeline).getAllElements();
- var result = new ArrayList<NamedStreamPipesEntity>();
+ var validationInfos = new
ArrayList<ExtendedPipelineElementValidationInfo>();
+ var modifiedPipelineElements = new ArrayList<NamedStreamPipesEntity>();
allElements.forEach(pipelineElement -> {
var modificationOpt = getModification(pipelineElement.getDom(),
pipelineModifications);
if (modificationOpt.isPresent()) {
@@ -79,15 +84,26 @@ public class PipelineVerificationHandlerV2 {
applyModificationsForDataProcessor((DataProcessorInvocation)
pipelineElement, modification);
}
}
+ validationInfos.addAll(
+ modification.getValidationInfos()
+ .stream()
+ .map(v -> new ExtendedPipelineElementValidationInfo(
+ pipelineElement.getName(),
+ pipelineElement.getDom(),
+ v
+ )
+ )
+ .toList()
+ );
if (!ignoreUnconfigured || modification.isPipelineElementValid()) {
- result.add(pipelineElement);
+ modifiedPipelineElements.add(pipelineElement);
}
} else {
- result.add(pipelineElement);
+ modifiedPipelineElements.add(pipelineElement);
}
});
- return result;
+ return new PipelineVerificationResult(validationInfos,
modifiedPipelineElements);
}
private void applyModificationsForDataProcessor(DataProcessorInvocation
pipelineElement,
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
index 1083396fcb..fadbd19ecb 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
@@ -29,23 +29,33 @@ import
org.apache.streampipes.model.staticproperty.MappingPropertyNary;
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
import org.apache.streampipes.model.staticproperty.MatchingStaticProperty;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
import
org.apache.streampipes.model.staticproperty.RuntimeResolvableGroupStaticProperty;
import
org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public class CheckCompletedVisitor extends DefaultStaticPropertyVisitor {
- private List<PipelineElementValidationInfo> validationInfos;
+ private final List<PipelineElementValidationInfo> validationInfos;
public CheckCompletedVisitor() {
this.validationInfos = new ArrayList<>();
}
+ public CheckCompletedVisitor(boolean ignoreValidation) {
+ this();
+ this.ignoreValidation = ignoreValidation;
+ }
+
@Override
public void visit(AnyStaticProperty property) {
@@ -53,21 +63,22 @@ public class CheckCompletedVisitor extends
DefaultStaticPropertyVisitor {
@Override
public void visit(CodeInputStaticProperty codeInputStaticProperty) {
+ validateNull(codeInputStaticProperty, codeInputStaticProperty.getValue());
}
@Override
public void visit(ColorPickerStaticProperty colorPickerStaticProperty) {
-
+ validateNull(colorPickerStaticProperty,
colorPickerStaticProperty.getSelectedColor());
}
@Override
public void visit(FileStaticProperty fileStaticProperty) {
-
+ validateNull(fileStaticProperty, fileStaticProperty.getLocationPath());
}
@Override
public void visit(FreeTextStaticProperty freeTextStaticProperty) {
-
+ validateNull(freeTextStaticProperty, freeTextStaticProperty.getValue());
}
@Override
@@ -110,7 +121,7 @@ public class CheckCompletedVisitor extends
DefaultStaticPropertyVisitor {
}
}
} else {
- if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()){
+ if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()) {
String firstSelector =
mappingPropertyUnary.getMapsFromOptions().get(0);
mappingPropertyUnary.setSelectedProperty(firstSelector);
}
@@ -124,12 +135,19 @@ public class CheckCompletedVisitor extends
DefaultStaticPropertyVisitor {
@Override
public void visit(OneOfStaticProperty oneOfStaticProperty) {
-
+ if (!ignoreValidation &&
oneOfStaticProperty.getOptions().stream().noneMatch(Option::isSelected)) {
+ validationInfos.add(PipelineElementValidationInfo.error(
+ String.format(
+ "Configuration \"%s\" must have one selected option, but no
option was selected.",
+ oneOfStaticProperty.getInternalName()
+ )
+ ));
+ }
}
@Override
public void visit(SecretStaticProperty secretStaticProperty) {
-
+ validateNull(secretStaticProperty, secretStaticProperty.getValue());
}
@Override
@@ -139,18 +157,43 @@ public class CheckCompletedVisitor extends
DefaultStaticPropertyVisitor {
@Override
public void visit(RuntimeResolvableTreeInputStaticProperty
treeInputStaticProperty) {
-
+ if (!ignoreValidation
+ && !treeInputStaticProperty.isOptional()
+ && treeInputStaticProperty.getSelectedNodesInternalNames().isEmpty()) {
+ addMissingConfiguration(treeInputStaticProperty);
+ }
}
@Override
public void visit(RuntimeResolvableGroupStaticProperty groupStaticProperty) {
+ }
+ @Override
+ public void visit(StaticPropertyAlternatives staticPropertyAlternatives) {
+ if (!ignoreValidation && !staticPropertyAlternatives.isOptional()
+ &&
staticPropertyAlternatives.getAlternatives().stream().noneMatch(StaticPropertyAlternative::getSelected))
{
+ validationInfos.add(PipelineElementValidationInfo.error(
+ String.format(
+ "No alternative of configuration \"%s\" was selected, but at
least one alternative must be chosen",
+ staticPropertyAlternatives.getInternalName()
+ )
+ ));
+ }
+ var visitor = new CheckCompletedVisitor(true);
+ staticPropertyAlternatives.getAlternatives().forEach(alternative ->
alternative.accept(visitor));
+ validationInfos.addAll(visitor.getValidationInfos());
}
public List<PipelineElementValidationInfo> getValidationInfos() {
return this.validationInfos;
}
+ private void validateNull(StaticProperty sp, Object value) {
+ if (!ignoreValidation && !sp.isOptional() && Objects.isNull(value)) {
+ addMissingConfiguration(sp);
+ }
+ }
+
private boolean existsSelection(MappingPropertyUnary mappingProperty) {
return !(mappingProperty.getSelectedProperty() == null ||
mappingProperty.getSelectedProperty().isEmpty());
}
@@ -158,4 +201,15 @@ public class CheckCompletedVisitor extends
DefaultStaticPropertyVisitor {
private boolean existsSelection(MappingPropertyNary mappingProperty) {
return !(mappingProperty.getSelectedProperties() == null ||
mappingProperty.getSelectedProperties().isEmpty());
}
+
+ private void addMissingConfiguration(StaticProperty sp) {
+ validationInfos.add(
+ PipelineElementValidationInfo.error(
+ String.format(
+ "Configuration option \"%s\" as no value although it is marked
as required",
+ sp.getInternalName()
+ )
+ )
+ );
+ }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
new file mode 100644
index 0000000000..1a0a1cd44e
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact;
+
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
+import
org.apache.streampipes.manager.pipeline.compact.generation.PipelineElementConfigurationStep;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModificationResult;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+
+public class CompactPipelineManagement {
+
+ private final IPipelineElementDescriptionStorage storage;
+
+ public CompactPipelineManagement(IPipelineElementDescriptionStorage storage)
{
+ this.storage = storage;
+ }
+
+ public PipelineModificationResult makePipeline(CompactPipeline
compactPipeline) throws Exception {
+ var pipeline = new Pipeline();
+ applyPipelineBasics(compactPipeline, pipeline);
+
+ new PipelineElementConfigurationStep(storage).apply(pipeline,
compactPipeline);
+
+ return new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
+ }
+
+ private void applyPipelineBasics(CompactPipeline compactPipeline,
+ Pipeline pipeline) {
+ pipeline.setElementId(compactPipeline.id());
+ pipeline.setName(compactPipeline.name());
+ pipeline.setDescription(compactPipeline.description());
+ }
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
similarity index 70%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
index 9285ae1878..18b8efe9e1 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
@@ -16,8 +16,13 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.manager.pipeline.compact.generation;
-import java.util.Map;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
-public record CompactConfiguration(Map<String, Object> values) {}
+public interface CompactPipelineGenerator {
+
+ void apply(Pipeline pipeline,
+ CompactPipeline compactPipeline) throws Exception;
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
new file mode 100644
index 0000000000..c736293c20
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.manager.template.DataProcessorTemplateHandler;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+
+public class DataProcessorPipelineElementGenerator {
+
+ private final InvocablePipelineElementGenerator<DataProcessorInvocation>
basicGenerator;
+
+ public DataProcessorPipelineElementGenerator(
+ InvocablePipelineElementGenerator<DataProcessorInvocation>
basicGenerator) {
+ this.basicGenerator = basicGenerator;
+ }
+
+ public DataProcessorInvocation generate(DataProcessorInvocation processor,
+ CompactPipelineElement
pipelineElement) {
+ basicGenerator.apply(processor, pipelineElement);
+ var template = basicGenerator.makeTemplate(processor, pipelineElement);
+ return new DataProcessorTemplateHandler(template, processor, false)
+ .applyTemplateOnPipelineElement();
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
new file mode 100644
index 0000000000..2c1f940252
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.manager.template.DataSinkTemplateHandler;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+
+public class DataSinkPipelineElementGenerator {
+
+ private final InvocablePipelineElementGenerator<DataSinkInvocation>
basicGenerator;
+
+ public
DataSinkPipelineElementGenerator(InvocablePipelineElementGenerator<DataSinkInvocation>
basicGenerator) {
+ this.basicGenerator = basicGenerator;
+ }
+
+ public DataSinkInvocation generate(DataSinkInvocation sink,
+ CompactPipelineElement
pipelineElement) {
+ basicGenerator.apply(sink, pipelineElement);
+ var template = basicGenerator.makeTemplate(sink, pipelineElement);
+ return new DataSinkTemplateHandler(template, sink, false)
+ .applyTemplateOnPipelineElement();
+ }
+}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
similarity index 64%
rename from
streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
rename to
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
index 9285ae1878..9d466062c9 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
@@ -16,8 +16,16 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.manager.pipeline.compact.generation;
-import java.util.Map;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
-public record CompactConfiguration(Map<String, Object> values) {}
+public class DataStreamPipelineElementGenerator {
+
+ public SpDataStream generate(SpDataStream stream,
+ CompactPipelineElement pipelineElement) {
+ stream.setDom("jsplumb_" + pipelineElement.ref());
+ return stream;
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
new file mode 100644
index 0000000000..0f8d135830
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+import org.apache.streampipes.model.util.Cloner;
+
+import java.util.ArrayList;
+
+public class InvocablePipelineElementGenerator<T extends
InvocableStreamPipesEntity> {
+
+ private static final String ID_PREFIX = "jsplumb_";
+
+ public void apply(T element,
+ CompactPipelineElement compatPipelineElement) {
+ element.setDom(ID_PREFIX + compatPipelineElement.ref());
+ element.setConnectedTo(compatPipelineElement.connectedTo().stream().map(c
-> ID_PREFIX + c).toList());
+ element.setStreamRequirements(new
Cloner().streams(element.getStreamRequirements()));
+ }
+
+ protected PipelineElementTemplate makeTemplate(T element,
+ CompactPipelineElement
compactPipelineElement) {
+ var configs = compactPipelineElement.configuration();
+ if (compactPipelineElement.configuration() == null) {
+ configs = new ArrayList<>();
+ }
+ var template = new PipelineElementTemplate();
+ template.setTemplateConfigs(configs);
+ template.setBasePipelineElementAppId(element.getAppId());
+
+ return template;
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
new file mode 100644
index 0000000000..ec5539414c
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+
+public class PipelineElementConfigurationStep implements
CompactPipelineGenerator {
+
+ private static final String StreamType = "stream";
+ private static final String ProcessorType = "processor";
+ private static final String SinkType = "sink";
+
+ private final IPipelineElementDescriptionStorage storage;
+
+ public PipelineElementConfigurationStep(IPipelineElementDescriptionStorage
storage) {
+ this.storage = storage;
+ }
+
+ @Override
+ public void apply(Pipeline pipeline,
+ CompactPipeline compactPipeline) throws Exception {
+ compactPipeline.pipelineElements().forEach(pe -> {
+ if (pe.type().equalsIgnoreCase(StreamType)) {
+ pipeline.getStreams().add(makeStream(pe));
+ } else if (pe.type().equalsIgnoreCase(ProcessorType)) {
+ pipeline.getSepas().add(makeProcessor(pe));
+ } else if (pe.type().equalsIgnoreCase(SinkType)) {
+ pipeline.getActions().add(makeSink(pe));
+ }
+ });
+ }
+
+ public SpDataStream makeStream(CompactPipelineElement pipelineElement) {
+ var element = storage.getDataStreamById(pipelineElement.id());
+ return new DataStreamPipelineElementGenerator().generate(element,
pipelineElement);
+ }
+
+ public DataProcessorInvocation makeProcessor(CompactPipelineElement
pipelineElement) {
+ var element = storage.getDataProcessorByAppId(pipelineElement.id());
+ var invocation = new DataProcessorInvocation(element);
+ return new DataProcessorPipelineElementGenerator(new
InvocablePipelineElementGenerator<>())
+ .generate(invocation, pipelineElement);
+ }
+
+ public DataSinkInvocation makeSink(CompactPipelineElement pipelineElement) {
+ var element = storage.getDataSinkByAppId(pipelineElement.id());
+ var invocation = new DataSinkInvocation(element);
+ return new DataSinkPipelineElementGenerator(new
InvocablePipelineElementGenerator<>())
+ .generate(invocation, pipelineElement);
+ }
+
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 0e6b22bdc6..3ec765196f 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -53,6 +53,7 @@ public class PipelinePreview {
List<NamedStreamPipesEntity> pipelineElements = new ArrayList<>(
new PipelineVerificationHandlerV2(pipeline)
.verifyAndBuildGraphs(true)
+ .modifiedPipelineElements()
);
rewriteElementIds(pipelineElements, elementIdMappings);
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
new file mode 100644
index 0000000000..2147dde19e
--- /dev/null
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import
org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement;
+import org.apache.streampipes.model.message.Notification;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.security.AuthConstants;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api/v2/compact-pipelines")
+public class CompactPipelineResource extends AbstractAuthGuardedRestResource {
+
+ private final CompactPipelineManagement compactPipelineManagement;
+
+ public CompactPipelineResource() {
+ this.compactPipelineManagement = new CompactPipelineManagement(
+ getPipelineElementStorage()
+ );
+ }
+
+ @PostMapping(
+ consumes = {
+ MediaType.APPLICATION_JSON_VALUE,
+ "application/yaml",
+ "application/yml"
+ }
+ )
+ @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
+ public ResponseEntity<?> addPipelineCompact(
+ @RequestBody CompactPipeline compactPipeline
+ ) throws Exception {
+
+ var pipelineGenerationResult =
compactPipelineManagement.makePipeline(compactPipeline);
+ if (pipelineGenerationResult.allPipelineElementsValid()) {
+ String pipelineId =
PipelineManager.addPipeline(getAuthenticatedUserSid(),
pipelineGenerationResult.pipeline());
+ if (compactPipeline.createOptions().start()) {
+ try {
+ PipelineOperationStatus status =
PipelineManager.startPipeline(pipelineId);
+ return ok(status);
+ } catch (Exception e) {
+ return
statusMessage(Notifications.error(NotificationType.UNKNOWN_ERROR));
+ }
+ }
+ var message = Notifications.success("Pipeline stored");
+ message.addNotification(new Notification("id", pipelineId));
+ return ok(message);
+ } else {
+ return
ResponseEntity.status(400).body(pipelineGenerationResult.validationInfos());
+ }
+ }
+
+ @PutMapping(
+ path = "{id}",
+ consumes = {
+ MediaType.APPLICATION_JSON_VALUE,
+ "application/yaml",
+ "application/yml"
+ }
+ )
+ @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
+ public ResponseEntity<?> updatePipelineCompact(
+ @PathVariable("id") String elementId,
+ @RequestBody CompactPipeline compactPipeline
+ ) throws Exception {
+
+ return null;
+ }
+}