This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 7783d88030 Support output strategies in pipeline code (#3312)
7783d88030 is described below
commit 7783d880303ab050695ad662f7f3ad6cfb121b88
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Oct 31 21:01:57 2024 +0100
Support output strategies in pipeline code (#3312)
* Add visitor to output strategies
* Support output strategy in pipeline templates
* Update ts model
* Add license
* Fix checkstyle
---
.../management/compact/PersistPipelineHandler.java | 1 +
.../model/output/AppendOutputStrategy.java | 7 +-
.../model/output/CustomOutputStrategy.java | 7 +-
.../output/CustomTransformOutputStrategy.java | 6 +-
.../model/output/FixedOutputStrategy.java | 4 +
.../model/output/KeepOutputStrategy.java | 5 +
.../model/output/ListOutputStrategy.java | 4 +
.../streampipes/model/output/OutputStrategy.java | 2 +
...putStrategy.java => OutputStrategyVisitor.java} | 27 ++----
.../model/output/TransformOutputStrategy.java | 5 +
.../model/output/UserDefinedOutputStrategy.java | 5 +
.../pipeline/compact/CompactPipelineElement.java | 3 +-
...pelineElement.java => OutputConfiguration.java} | 8 +-
.../model/pipeline/compact/UserDefinedOutput.java | 7 +-
.../generation/CompactPipelineConverter.java | 39 +++++++-
.../DataProcessorPipelineElementGenerator.java | 8 +-
.../generation/OutputStrategyGenerator.java | 108 +++++++++++++++++++++
.../compact/CompactPipelineTemplateManagement.java | 3 +-
.../instances/PersistDataLakePipelineTemplate.java | 3 +-
.../src/lib/model/gen/streampipes-model.ts | 44 ++++++++-
.../adapter-started-dialog.component.ts | 1 +
ui/src/app/core-ui/help/help.component.html | 24 +++--
ui/src/app/core-ui/help/help.component.scss | 10 ++
ui/src/app/core-ui/help/help.component.ts | 4 +
24 files changed, 290 insertions(+), 45 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
index dd0afb7ef7..8b53e019f6 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
@@ -94,6 +94,7 @@ public class PersistPipelineHandler {
DATA_LAKE_CONNECTOR_ID,
adapterDescription.getCorrespondingDataStreamElementId(),
null,
+ null,
null
));
return pipelineElements;
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java
index 9d22313bc4..fa7ccca3c4 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/AppendOutputStrategy.java
@@ -26,8 +26,6 @@ import java.util.List;
public class AppendOutputStrategy extends OutputStrategy {
- private static final long serialVersionUID = 7202888911899551012L;
-
private List<EventProperty> eventProperties;
public AppendOutputStrategy() {
@@ -35,6 +33,11 @@ public class AppendOutputStrategy extends OutputStrategy {
eventProperties = new ArrayList<>();
}
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
+
public AppendOutputStrategy(AppendOutputStrategy other) {
super(other);
this.setEventProperties(new
Cloner().properties(other.getEventProperties()));
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java
index 4468ba14f2..fa859aeef4 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomOutputStrategy.java
@@ -23,8 +23,6 @@ import java.util.List;
public class CustomOutputStrategy extends OutputStrategy {
- private static final long serialVersionUID = -5858193127308435472L;
-
private List<String> selectedPropertyKeys;
private boolean outputRight;
@@ -77,4 +75,9 @@ public class CustomOutputStrategy extends OutputStrategy {
public void setAvailablePropertyKeys(List<String> availablePropertyKeys) {
this.availablePropertyKeys = availablePropertyKeys;
}
+
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java
index 91d1a6ae3d..426a8d29eb 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/CustomTransformOutputStrategy.java
@@ -32,7 +32,6 @@ public class CustomTransformOutputStrategy extends
OutputStrategy {
this.eventProperties = new ArrayList<>();
}
-
public CustomTransformOutputStrategy(CustomTransformOutputStrategy other) {
super(other);
this.eventProperties = new Cloner().properties(other.getEventProperties());
@@ -45,4 +44,9 @@ public class CustomTransformOutputStrategy extends
OutputStrategy {
public void setEventProperties(List<EventProperty> eventProperties) {
this.eventProperties = eventProperties;
}
+
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java
index e698348efd..1558aada49 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/FixedOutputStrategy.java
@@ -50,5 +50,9 @@ public class FixedOutputStrategy extends OutputStrategy {
this.eventProperties = eventProperties;
}
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java
index d455e7b4c2..0c8b9b0e00 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/KeepOutputStrategy.java
@@ -63,4 +63,9 @@ public class KeepOutputStrategy extends OutputStrategy {
public void setKeepBoth(boolean keepBoth) {
this.keepBoth = keepBoth;
}
+
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java
index 4040d9387c..b2e1f4ba51 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/ListOutputStrategy.java
@@ -46,5 +46,9 @@ public class ListOutputStrategy extends OutputStrategy {
this.propertyName = propertyName;
}
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java
index 05484f1679..ab19a8d89b 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategy.java
@@ -75,4 +75,6 @@ public abstract class OutputStrategy {
public void setRenameRules(List<PropertyRenameRule> renameRules) {
this.renameRules = renameRules;
}
+
+ public abstract void accept(OutputStrategyVisitor visitor);
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java
similarity index 58%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java
index 7f8f725eef..024e22b994 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/OutputStrategyVisitor.java
@@ -18,28 +18,21 @@
package org.apache.streampipes.model.output;
-import org.apache.streampipes.model.schema.EventProperty;
+public interface OutputStrategyVisitor {
-import java.util.List;
+ void visit(AppendOutputStrategy appendOutputStrategy);
-public class UserDefinedOutputStrategy extends OutputStrategy {
+ void visit(CustomOutputStrategy customOutputStrategy);
- private List<EventProperty> eventProperties;
+ void visit(CustomTransformOutputStrategy customTransformOutputStrategy);
- public UserDefinedOutputStrategy() {
- super();
- }
+ void visit(FixedOutputStrategy fixedOutputStrategy);
- public UserDefinedOutputStrategy(UserDefinedOutputStrategy other) {
- super(other);
- this.eventProperties = other.getEventProperties();
- }
+ void visit(KeepOutputStrategy keepOutputStrategy);
- public List<EventProperty> getEventProperties() {
- return eventProperties;
- }
+ void visit(ListOutputStrategy listOutputStrategy);
- public void setEventProperties(List<EventProperty> eventProperties) {
- this.eventProperties = eventProperties;
- }
+ void visit(TransformOutputStrategy transformOutputStrategy);
+
+ void visit(UserDefinedOutputStrategy userDefinedOutputStrategy);
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java
index 1ca951654b..7e9de992ad 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/TransformOutputStrategy.java
@@ -43,4 +43,9 @@ public class TransformOutputStrategy extends OutputStrategy {
public void setTransformOperations(List<TransformOperation>
transformOperations) {
this.transformOperations = transformOperations;
}
+
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
index 7f8f725eef..622d1e25ec 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/output/UserDefinedOutputStrategy.java
@@ -42,4 +42,9 @@ public class UserDefinedOutputStrategy extends OutputStrategy
{
public void setEventProperties(List<EventProperty> eventProperties) {
this.eventProperties = eventProperties;
}
+
+ @Override
+ public void accept(OutputStrategyVisitor visitor) {
+ visitor.visit(this);
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
index b145c2aad3..f027e60e05 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
@@ -28,5 +28,6 @@ public record CompactPipelineElement(String type,
String ref,
String id,
List<String> connectedTo,
- List<Map<String, Object>> configuration) {
+ List<Map<String, Object>> configuration,
+ OutputConfiguration output) {
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
similarity index 75%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
index b145c2aad3..f9d88fa97d 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
@@ -21,12 +21,8 @@ package org.apache.streampipes.model.pipeline.compact;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.List;
-import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
-public record CompactPipelineElement(String type,
- String ref,
- String id,
- List<String> connectedTo,
- List<Map<String, Object>> configuration) {
+public record OutputConfiguration(List<String> keep,
+ List<UserDefinedOutput> userDefined) {
}
diff --git a/ui/src/app/core-ui/help/help.component.scss
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
similarity index 78%
copy from ui/src/app/core-ui/help/help.component.scss
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
index ecf2724830..22b9b924a1 100644
--- a/ui/src/app/core-ui/help/help.component.scss
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
@@ -16,4 +16,9 @@
*
*/
-@import '../../../scss/sp/sp-dialog.scss';
+package org.apache.streampipes.model.pipeline.compact;
+
+public record UserDefinedOutput(String fieldName,
+ String runtimeType,
+ String semanticType) {
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
index bebc3b9286..c3230822b3 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
@@ -19,8 +19,15 @@
package org.apache.streampipes.manager.pipeline.compact.generation;
import org.apache.streampipes.manager.template.CompactConfigGenerator;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategy;
+import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
+import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import java.util.ArrayList;
@@ -38,6 +45,7 @@ public class CompactPipelineConverter {
stream.getDom(),
stream.getElementId(),
null,
+ null,
null))
.forEach(pipelineElements::add);
@@ -47,7 +55,8 @@ public class CompactPipelineConverter {
processor.getDom(),
processor.getAppId(),
processor.getConnectedTo(),
- getConfig(processor.getStaticProperties())))
+ getConfig(processor.getStaticProperties()),
+ getOutput(processor.getOutputStrategies().get(0))))
.forEach(pipelineElements::add);
pipeline.getActions().stream()
@@ -56,7 +65,8 @@ public class CompactPipelineConverter {
sink.getDom(),
sink.getAppId(),
sink.getConnectedTo(),
- getConfig(sink.getStaticProperties())))
+ getConfig(sink.getStaticProperties()),
+ null))
.forEach(pipelineElements::add);
return pipelineElements;
@@ -66,11 +76,12 @@ public class CompactPipelineConverter {
String ref,
String elementId,
List<String> connectedTo,
- List<Map<String, Object>>
config) {
+ List<Map<String, Object>>
config,
+ OutputConfiguration
outputConfiguration) {
var connections = connectedTo != null ? connectedTo.stream()
.map(this::replaceId)
.toList() : null;
- return new CompactPipelineElement(type, replaceId(ref), elementId,
connections, config);
+ return new CompactPipelineElement(type, replaceId(ref), elementId,
connections, config, outputConfiguration);
}
public List<Map<String, Object>> getConfig(List<StaticProperty>
staticProperties) {
@@ -79,6 +90,26 @@ public class CompactPipelineConverter {
return configs;
}
+ public OutputConfiguration getOutput(OutputStrategy outputStrategy) {
+ if (outputStrategy instanceof CustomOutputStrategy) {
+ return new OutputConfiguration(((CustomOutputStrategy)
outputStrategy).getSelectedPropertyKeys(), null);
+ } else if (outputStrategy instanceof UserDefinedOutputStrategy) {
+ return new OutputConfiguration(
+ null,
+ toCustomConfig(((UserDefinedOutputStrategy)
outputStrategy).getEventProperties())
+ );
+ } else {
+ return null;
+ }
+ }
+
+ private List<UserDefinedOutput> toCustomConfig(List<EventProperty>
eventProperties) {
+ return eventProperties.stream().map(ep -> new UserDefinedOutput(
+ ep.getRuntimeName(),
+ ((EventPropertyPrimitive) ep).getRuntimeType(),
+ ep.getSemanticType())).toList();
+ }
+
private String replaceId(String id) {
return id.replaceAll(InvocablePipelineElementGenerator.ID_PREFIX, "");
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
index c736293c20..89f7fd10dd 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
@@ -35,7 +35,13 @@ public class DataProcessorPipelineElementGenerator {
CompactPipelineElement
pipelineElement) {
basicGenerator.apply(processor, pipelineElement);
var template = basicGenerator.makeTemplate(processor, pipelineElement);
- return new DataProcessorTemplateHandler(template, processor, false)
+ var element = new DataProcessorTemplateHandler(template, processor, false)
.applyTemplateOnPipelineElement();
+
+ if (pipelineElement.output() != null) {
+ var outputStrategyGenerator = new
OutputStrategyGenerator(pipelineElement.output());
+ element.getOutputStrategies().forEach(o ->
o.accept(outputStrategyGenerator));
+ }
+ return element;
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
new file mode 100644
index 0000000000..2cb4cd6067
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.output.AppendOutputStrategy;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.output.CustomTransformOutputStrategy;
+import org.apache.streampipes.model.output.FixedOutputStrategy;
+import org.apache.streampipes.model.output.KeepOutputStrategy;
+import org.apache.streampipes.model.output.ListOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategyVisitor;
+import org.apache.streampipes.model.output.TransformOutputStrategy;
+import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
+import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
+import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+
+import java.util.List;
+
+public class OutputStrategyGenerator implements OutputStrategyVisitor {
+
+ private final OutputConfiguration config;
+
+ public OutputStrategyGenerator(OutputConfiguration outputConfiguration) {
+ this.config = outputConfiguration;
+ }
+
+ @Override
+ public void visit(AppendOutputStrategy appendOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(CustomOutputStrategy customOutputStrategy) {
+ var keepConfig = config.keep();
+ if (keepConfig != null && !keepConfig.isEmpty()) {
+ customOutputStrategy.setSelectedPropertyKeys(keepConfig);
+ }
+ }
+
+ @Override
+ public void visit(CustomTransformOutputStrategy
customTransformOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(FixedOutputStrategy fixedOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(KeepOutputStrategy keepOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(ListOutputStrategy listOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(TransformOutputStrategy transformOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(UserDefinedOutputStrategy userDefinedOutputStrategy) {
+ var userDefinedConfig = config.userDefined();
+ if (userDefinedConfig != null && !userDefinedConfig.isEmpty()) {
+ userDefinedOutputStrategy.setEventProperties(
+ toEp(userDefinedConfig)
+ );
+ }
+ }
+
+ private List<EventProperty> toEp(List<UserDefinedOutput> userDefinedOutput) {
+ return userDefinedOutput
+ .stream()
+ .map(this::toPrimitive)
+ .toList();
+ }
+
+ private EventProperty toPrimitive(UserDefinedOutput u) {
+ var ep = new EventPropertyPrimitive();
+ ep.setRuntimeName(u.fieldName());
+ ep.setSemanticType(u.semanticType());
+ ep.setRuntimeType(u.runtimeType());
+
+ return ep;
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
index 0bf308cfc4..1238a0aeda 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
@@ -57,7 +57,8 @@ public class CompactPipelineTemplateManagement {
key,
stream.getElementId(),
List.of(),
- List.of()
+ List.of(),
+ null
));
});
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
index caa6e7bf94..00c768f4a7 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
@@ -52,7 +52,8 @@ public class PersistDataLakePipelineTemplate implements
DefaultPipelineTemplateP
List.of(
Map.of("schema_update", "Update schema"),
Map.of("ignore_duplicates", false)
- )
+ ),
+ null
)
)
);
diff --git
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index cf07248af8..3aa23a65bf 100644
---
a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++
b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -20,7 +20,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 3.2.1263 on 2024-10-11
10:41:46.
+// Generated using typescript-generator version 3.2.1263 on 2024-10-29
10:04:46.
export class NamedStreamPipesEntity implements Storable {
'@class':
@@ -844,6 +844,7 @@ export class CompactPipelineElement {
configuration: { [index: string]: any }[];
connectedTo: string[];
id: string;
+ output: OutputConfiguration;
ref: string;
type: string;
@@ -862,6 +863,7 @@ export class CompactPipelineElement {
data.connectedTo,
);
instance.id = data.id;
+ instance.output = OutputConfiguration.fromData(data.output);
instance.ref = data.ref;
instance.type = data.type;
return instance;
@@ -2605,6 +2607,26 @@ export class Option {
}
}
+export class OutputConfiguration {
+ keep: string[];
+ userDefined: UserDefinedOutput[];
+
+ static fromData(
+ data: OutputConfiguration,
+ target?: OutputConfiguration,
+ ): OutputConfiguration {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new OutputConfiguration();
+ instance.keep = __getCopyArrayFn(__identity<string>())(data.keep);
+ instance.userDefined = __getCopyArrayFn(UserDefinedOutput.fromData)(
+ data.userDefined,
+ );
+ return instance;
+ }
+}
+
export class Pipeline implements Storable {
_id: string;
_rev: string;
@@ -4056,6 +4078,26 @@ export class UnitTransformRuleDescription extends
ValueTransformationRuleDescrip
}
}
+export class UserDefinedOutput {
+ fieldName: string;
+ runtimeType: string;
+ semanticType: string;
+
+ static fromData(
+ data: UserDefinedOutput,
+ target?: UserDefinedOutput,
+ ): UserDefinedOutput {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new UserDefinedOutput();
+ instance.fieldName = data.fieldName;
+ instance.runtimeType = data.runtimeType;
+ instance.semanticType = data.semanticType;
+ return instance;
+ }
+}
+
export class UserDefinedOutputStrategy extends OutputStrategy {
'@class': 'org.apache.streampipes.model.output.UserDefinedOutputStrategy';
'eventProperties': EventPropertyUnion[];
diff --git
a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
index 459327fd25..17cc830821 100644
---
a/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
+++
b/ui/src/app/connect/dialog/adapter-started/adapter-started-dialog.component.ts
@@ -278,6 +278,7 @@ export class AdapterStartedDialog implements OnInit {
configuration: undefined,
id: adapter.correspondingDataStreamElementId,
connectedTo: undefined,
+ output: undefined,
});
return template;
}
diff --git a/ui/src/app/core-ui/help/help.component.html
b/ui/src/app/core-ui/help/help.component.html
index ecc82fc6a8..fcdfbe7332 100644
--- a/ui/src/app/core-ui/help/help.component.html
+++ b/ui/src/app/core-ui/help/help.component.html
@@ -17,18 +17,28 @@
-->
<div class="sp-dialog-container">
- <div class="sp-dialog-content p-15">
- <h4>{{ pipelineElement.name }}</h4>
- <p>
- {{ pipelineElement.description }}
- </p>
-
+ <div class="sp-dialog-content p-15" fxLayout="column">
+ <div fxLayout="row" fxFlex="100">
+ <div fxLayout="column" fxFlex>
+ <h4>{{ pipelineElement.name }}</h4>
+ <small>
+ {{ pipelineElement.description }}
+ </small>
+ </div>
+ <div class="element-id" fxLayoutAlign="end start">
+ <span>ID</span> <b>{{
+ isDataStream
+ ? pipelineElement.elementId
+ : pipelineElement.appId
+ }}</b>
+ </div>
+ </div>
<mat-tab-group
color="accent"
[selectedIndex]="selectedTabIndex"
(selectedIndexChange)="selectedTabIndex = $event"
>
- <mat-tab *ngFor="let tab of tabs" label="{{ tab }}"> </mat-tab>
+ <mat-tab *ngFor="let tab of tabs" label="{{ tab }}"></mat-tab>
</mat-tab-group>
<sp-pipeline-element-runtime-info
diff --git a/ui/src/app/core-ui/help/help.component.scss
b/ui/src/app/core-ui/help/help.component.scss
index ecf2724830..c077f68f35 100644
--- a/ui/src/app/core-ui/help/help.component.scss
+++ b/ui/src/app/core-ui/help/help.component.scss
@@ -17,3 +17,13 @@
*/
@import '../../../scss/sp/sp-dialog.scss';
+
+.element-id {
+ border-radius: 5px;
+ margin-right: 10px;
+ margin-top: 5px;
+ margin-bottom: 5px;
+ font-size: small;
+ display: inline-block;
+ padding: 5px;
+}
diff --git a/ui/src/app/core-ui/help/help.component.ts
b/ui/src/app/core-ui/help/help.component.ts
index 517a2c927d..fcfc5b8d2d 100644
--- a/ui/src/app/core-ui/help/help.component.ts
+++ b/ui/src/app/core-ui/help/help.component.ts
@@ -34,12 +34,14 @@ export class HelpComponent implements OnInit {
@Input()
pipelineElement: PipelineElementUnion;
+ isDataStream: boolean;
constructor(private dialogRef: DialogRef<HelpComponent>) {}
ngOnInit() {
if (this.pipelineElement instanceof SpDataStream) {
this.tabs = this.availableTabs;
+ this.isDataStream = true;
} else {
this.tabs.push(this.availableTabs[1]);
this.selectedTabIndex = 1;
@@ -51,4 +53,6 @@ export class HelpComponent implements OnInit {
this.dialogRef.close();
});
}
+
+ protected readonly SpDataStream = SpDataStream;
}