This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch support-output-strategies-in-pipeline-code
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/support-output-strategies-in-pipeline-code by this push:
new 08666dcb97 Support output strategy in pipeline templates
08666dcb97 is described below
commit 08666dcb977a1dc7898c8e1f7adcf29f3ea7ae71
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Oct 29 09:58:40 2024 +0100
Support output strategy in pipeline templates
---
.../management/compact/PersistPipelineHandler.java | 1 +
.../pipeline/compact/CompactPipelineElement.java | 3 +-
...pelineElement.java => OutputConfiguration.java} | 8 +-
.../model/pipeline/compact/UserDefinedOutput.java | 7 +-
.../generation/CompactPipelineConverter.java | 36 ++++++-
.../DataProcessorPipelineElementGenerator.java | 8 +-
.../generation/OutputStrategyGenerator.java | 108 +++++++++++++++++++++
.../compact/CompactPipelineTemplateManagement.java | 3 +-
.../instances/PersistDataLakePipelineTemplate.java | 3 +-
ui/src/app/core-ui/help/help.component.html | 24 +++--
ui/src/app/core-ui/help/help.component.scss | 10 ++
ui/src/app/core-ui/help/help.component.ts | 4 +
12 files changed, 193 insertions(+), 22 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
index dd0afb7ef7..8b53e019f6 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
@@ -94,6 +94,7 @@ public class PersistPipelineHandler {
DATA_LAKE_CONNECTOR_ID,
adapterDescription.getCorrespondingDataStreamElementId(),
null,
+ null,
null
));
return pipelineElements;
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
index b145c2aad3..f027e60e05 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
@@ -28,5 +28,6 @@ public record CompactPipelineElement(String type,
String ref,
String id,
List<String> connectedTo,
- List<Map<String, Object>> configuration) {
+ List<Map<String, Object>> configuration,
+ OutputConfiguration output) {
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
similarity index 75%
copy from
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
index b145c2aad3..f9d88fa97d 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
@@ -21,12 +21,8 @@ package org.apache.streampipes.model.pipeline.compact;
import com.fasterxml.jackson.annotation.JsonInclude;
import java.util.List;
-import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
-public record CompactPipelineElement(String type,
- String ref,
- String id,
- List<String> connectedTo,
- List<Map<String, Object>> configuration) {
+public record OutputConfiguration(List<String> keep,
+ List<UserDefinedOutput> userDefined) {
}
diff --git a/ui/src/app/core-ui/help/help.component.scss
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
similarity index 78%
copy from ui/src/app/core-ui/help/help.component.scss
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
index ecf2724830..22b9b924a1 100644
--- a/ui/src/app/core-ui/help/help.component.scss
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
@@ -16,4 +16,9 @@
*
*/
-@import '../../../scss/sp/sp-dialog.scss';
+package org.apache.streampipes.model.pipeline.compact;
+
+public record UserDefinedOutput(String fieldName,
+ String runtimeType,
+ String semanticType) {
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
index bebc3b9286..35b85741d8 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
@@ -19,8 +19,15 @@
package org.apache.streampipes.manager.pipeline.compact.generation;
import org.apache.streampipes.manager.template.CompactConfigGenerator;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategy;
+import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
+import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import java.util.ArrayList;
@@ -38,6 +45,7 @@ public class CompactPipelineConverter {
stream.getDom(),
stream.getElementId(),
null,
+ null,
null))
.forEach(pipelineElements::add);
@@ -47,7 +55,8 @@ public class CompactPipelineConverter {
processor.getDom(),
processor.getAppId(),
processor.getConnectedTo(),
- getConfig(processor.getStaticProperties())))
+ getConfig(processor.getStaticProperties()),
+ getOutput(processor.getOutputStrategies().get(0))))
.forEach(pipelineElements::add);
pipeline.getActions().stream()
@@ -56,7 +65,8 @@ public class CompactPipelineConverter {
sink.getDom(),
sink.getAppId(),
sink.getConnectedTo(),
- getConfig(sink.getStaticProperties())))
+ getConfig(sink.getStaticProperties()),
+ null))
.forEach(pipelineElements::add);
return pipelineElements;
@@ -66,11 +76,12 @@ public class CompactPipelineConverter {
String ref,
String elementId,
List<String> connectedTo,
- List<Map<String, Object>>
config) {
+ List<Map<String, Object>>
config,
+ OutputConfiguration
outputConfiguration) {
var connections = connectedTo != null ? connectedTo.stream()
.map(this::replaceId)
.toList() : null;
- return new CompactPipelineElement(type, replaceId(ref), elementId,
connections, config);
+ return new CompactPipelineElement(type, replaceId(ref), elementId,
connections, config, outputConfiguration);
}
public List<Map<String, Object>> getConfig(List<StaticProperty>
staticProperties) {
@@ -79,6 +90,23 @@ public class CompactPipelineConverter {
return configs;
}
+ public OutputConfiguration getOutput(OutputStrategy outputStrategy) {
+ if (outputStrategy instanceof CustomOutputStrategy) {
+ return new OutputConfiguration(((CustomOutputStrategy)
outputStrategy).getSelectedPropertyKeys(), null);
+ } else if (outputStrategy instanceof UserDefinedOutputStrategy) {
+ return new OutputConfiguration(null,
toCustomConfig(((UserDefinedOutputStrategy)
outputStrategy).getEventProperties()));
+ } else {
+ return null;
+ }
+ }
+
+ private List<UserDefinedOutput> toCustomConfig(List<EventProperty>
eventProperties) {
+ return eventProperties.stream().map(ep -> new UserDefinedOutput(
+ ep.getRuntimeName(),
+ ((EventPropertyPrimitive) ep).getRuntimeType(),
+ ep.getSemanticType())).toList();
+ }
+
private String replaceId(String id) {
return id.replaceAll(InvocablePipelineElementGenerator.ID_PREFIX, "");
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
index c736293c20..89f7fd10dd 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
@@ -35,7 +35,13 @@ public class DataProcessorPipelineElementGenerator {
CompactPipelineElement
pipelineElement) {
basicGenerator.apply(processor, pipelineElement);
var template = basicGenerator.makeTemplate(processor, pipelineElement);
- return new DataProcessorTemplateHandler(template, processor, false)
+ var element = new DataProcessorTemplateHandler(template, processor, false)
.applyTemplateOnPipelineElement();
+
+ if (pipelineElement.output() != null) {
+ var outputStrategyGenerator = new
OutputStrategyGenerator(pipelineElement.output());
+ element.getOutputStrategies().forEach(o ->
o.accept(outputStrategyGenerator));
+ }
+ return element;
}
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
new file mode 100644
index 0000000000..2cb4cd6067
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.output.AppendOutputStrategy;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.output.CustomTransformOutputStrategy;
+import org.apache.streampipes.model.output.FixedOutputStrategy;
+import org.apache.streampipes.model.output.KeepOutputStrategy;
+import org.apache.streampipes.model.output.ListOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategyVisitor;
+import org.apache.streampipes.model.output.TransformOutputStrategy;
+import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
+import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
+import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+
+import java.util.List;
+
+public class OutputStrategyGenerator implements OutputStrategyVisitor {
+
+ private final OutputConfiguration config;
+
+ public OutputStrategyGenerator(OutputConfiguration outputConfiguration) {
+ this.config = outputConfiguration;
+ }
+
+ @Override
+ public void visit(AppendOutputStrategy appendOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(CustomOutputStrategy customOutputStrategy) {
+ var keepConfig = config.keep();
+ if (keepConfig != null && !keepConfig.isEmpty()) {
+ customOutputStrategy.setSelectedPropertyKeys(keepConfig);
+ }
+ }
+
+ @Override
+ public void visit(CustomTransformOutputStrategy
customTransformOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(FixedOutputStrategy fixedOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(KeepOutputStrategy keepOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(ListOutputStrategy listOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(TransformOutputStrategy transformOutputStrategy) {
+
+ }
+
+ @Override
+ public void visit(UserDefinedOutputStrategy userDefinedOutputStrategy) {
+ var userDefinedConfig = config.userDefined();
+ if (userDefinedConfig != null && !userDefinedConfig.isEmpty()) {
+ userDefinedOutputStrategy.setEventProperties(
+ toEp(userDefinedConfig)
+ );
+ }
+ }
+
+ private List<EventProperty> toEp(List<UserDefinedOutput> userDefinedOutput) {
+ return userDefinedOutput
+ .stream()
+ .map(this::toPrimitive)
+ .toList();
+ }
+
+ private EventProperty toPrimitive(UserDefinedOutput u) {
+ var ep = new EventPropertyPrimitive();
+ ep.setRuntimeName(u.fieldName());
+ ep.setSemanticType(u.semanticType());
+ ep.setRuntimeType(u.runtimeType());
+
+ return ep;
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
index 0bf308cfc4..1238a0aeda 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
@@ -57,7 +57,8 @@ public class CompactPipelineTemplateManagement {
key,
stream.getElementId(),
List.of(),
- List.of()
+ List.of(),
+ null
));
});
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
index caa6e7bf94..00c768f4a7 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
@@ -52,7 +52,8 @@ public class PersistDataLakePipelineTemplate implements
DefaultPipelineTemplateP
List.of(
Map.of("schema_update", "Update schema"),
Map.of("ignore_duplicates", false)
- )
+ ),
+ null
)
)
);
diff --git a/ui/src/app/core-ui/help/help.component.html
b/ui/src/app/core-ui/help/help.component.html
index ecc82fc6a8..fcdfbe7332 100644
--- a/ui/src/app/core-ui/help/help.component.html
+++ b/ui/src/app/core-ui/help/help.component.html
@@ -17,18 +17,28 @@
-->
<div class="sp-dialog-container">
- <div class="sp-dialog-content p-15">
- <h4>{{ pipelineElement.name }}</h4>
- <p>
- {{ pipelineElement.description }}
- </p>
-
+ <div class="sp-dialog-content p-15" fxLayout="column">
+ <div fxLayout="row" fxFlex="100">
+ <div fxLayout="column" fxFlex>
+ <h4>{{ pipelineElement.name }}</h4>
+ <small>
+ {{ pipelineElement.description }}
+ </small>
+ </div>
+ <div class="element-id" fxLayoutAlign="end start">
+ <span>ID</span> <b>{{
+ isDataStream
+ ? pipelineElement.elementId
+ : pipelineElement.appId
+ }}</b>
+ </div>
+ </div>
<mat-tab-group
color="accent"
[selectedIndex]="selectedTabIndex"
(selectedIndexChange)="selectedTabIndex = $event"
>
- <mat-tab *ngFor="let tab of tabs" label="{{ tab }}"> </mat-tab>
+ <mat-tab *ngFor="let tab of tabs" label="{{ tab }}"></mat-tab>
</mat-tab-group>
<sp-pipeline-element-runtime-info
diff --git a/ui/src/app/core-ui/help/help.component.scss
b/ui/src/app/core-ui/help/help.component.scss
index ecf2724830..c077f68f35 100644
--- a/ui/src/app/core-ui/help/help.component.scss
+++ b/ui/src/app/core-ui/help/help.component.scss
@@ -17,3 +17,13 @@
*/
@import '../../../scss/sp/sp-dialog.scss';
+
+.element-id {
+ border-radius: 5px;
+ margin-right: 10px;
+ margin-top: 5px;
+ margin-bottom: 5px;
+ font-size: small;
+ display: inline-block;
+ padding: 5px;
+}
diff --git a/ui/src/app/core-ui/help/help.component.ts
b/ui/src/app/core-ui/help/help.component.ts
index 517a2c927d..fcfc5b8d2d 100644
--- a/ui/src/app/core-ui/help/help.component.ts
+++ b/ui/src/app/core-ui/help/help.component.ts
@@ -34,12 +34,14 @@ export class HelpComponent implements OnInit {
@Input()
pipelineElement: PipelineElementUnion;
+ isDataStream: boolean;
constructor(private dialogRef: DialogRef<HelpComponent>) {}
ngOnInit() {
if (this.pipelineElement instanceof SpDataStream) {
this.tabs = this.availableTabs;
+ this.isDataStream = true;
} else {
this.tabs.push(this.availableTabs[1]);
this.selectedTabIndex = 1;
@@ -51,4 +53,6 @@ export class HelpComponent implements OnInit {
this.dialogRef.close();
});
}
+
+ protected readonly SpDataStream = SpDataStream;
}