This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch support-output-strategies-in-pipeline-code
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/support-output-strategies-in-pipeline-code by this push:
     new 08666dcb97 Support output strategy in pipeline templates
08666dcb97 is described below

commit 08666dcb977a1dc7898c8e1f7adcf29f3ea7ae71
Author: Dominik Riemer <[email protected]>
AuthorDate: Tue Oct 29 09:58:40 2024 +0100

    Support output strategy in pipeline templates
---
 .../management/compact/PersistPipelineHandler.java |   1 +
 .../pipeline/compact/CompactPipelineElement.java   |   3 +-
 ...pelineElement.java => OutputConfiguration.java} |   8 +-
 .../model/pipeline/compact/UserDefinedOutput.java  |   7 +-
 .../generation/CompactPipelineConverter.java       |  36 ++++++-
 .../DataProcessorPipelineElementGenerator.java     |   8 +-
 .../generation/OutputStrategyGenerator.java        | 108 +++++++++++++++++++++
 .../compact/CompactPipelineTemplateManagement.java |   3 +-
 .../instances/PersistDataLakePipelineTemplate.java |   3 +-
 ui/src/app/core-ui/help/help.component.html        |  24 +++--
 ui/src/app/core-ui/help/help.component.scss        |  10 ++
 ui/src/app/core-ui/help/help.component.ts          |   4 +
 12 files changed, 193 insertions(+), 22 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
index dd0afb7ef7..8b53e019f6 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/PersistPipelineHandler.java
@@ -94,6 +94,7 @@ public class PersistPipelineHandler {
         DATA_LAKE_CONNECTOR_ID,
         adapterDescription.getCorrespondingDataStreamElementId(),
         null,
+        null,
         null
     ));
     return pipelineElements;
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
index b145c2aad3..f027e60e05 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
@@ -28,5 +28,6 @@ public record CompactPipelineElement(String type,
                                      String ref,
                                      String id,
                                      List<String> connectedTo,
-                                     List<Map<String, Object>> configuration) {
+                                     List<Map<String, Object>> configuration,
+                                     OutputConfiguration output) {
 }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
similarity index 75%
copy from 
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
copy to 
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
index b145c2aad3..f9d88fa97d 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/OutputConfiguration.java
@@ -21,12 +21,8 @@ package org.apache.streampipes.model.pipeline.compact;
 import com.fasterxml.jackson.annotation.JsonInclude;
 
 import java.util.List;
-import java.util.Map;
 
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public record CompactPipelineElement(String type,
-                                     String ref,
-                                     String id,
-                                     List<String> connectedTo,
-                                     List<Map<String, Object>> configuration) {
+public record OutputConfiguration(List<String> keep,
+                                  List<UserDefinedOutput> userDefined) {
 }
diff --git a/ui/src/app/core-ui/help/help.component.scss 
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
similarity index 78%
copy from ui/src/app/core-ui/help/help.component.scss
copy to 
streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
index ecf2724830..22b9b924a1 100644
--- a/ui/src/app/core-ui/help/help.component.scss
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/UserDefinedOutput.java
@@ -16,4 +16,9 @@
  *
  */
 
-@import '../../../scss/sp/sp-dialog.scss';
+package org.apache.streampipes.model.pipeline.compact;
+
+public record UserDefinedOutput(String fieldName,
+                                String runtimeType,
+                                String semanticType) {
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
index bebc3b9286..35b85741d8 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineConverter.java
@@ -19,8 +19,15 @@
 package org.apache.streampipes.manager.pipeline.compact.generation;
 
 import org.apache.streampipes.manager.template.CompactConfigGenerator;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategy;
+import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
+import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.staticproperty.StaticProperty;
 
 import java.util.ArrayList;
@@ -38,6 +45,7 @@ public class CompactPipelineConverter {
             stream.getDom(),
             stream.getElementId(),
             null,
+            null,
             null))
         .forEach(pipelineElements::add);
 
@@ -47,7 +55,8 @@ public class CompactPipelineConverter {
             processor.getDom(),
             processor.getAppId(),
             processor.getConnectedTo(),
-            getConfig(processor.getStaticProperties())))
+            getConfig(processor.getStaticProperties()),
+            getOutput(processor.getOutputStrategies().get(0))))
         .forEach(pipelineElements::add);
 
     pipeline.getActions().stream()
@@ -56,7 +65,8 @@ public class CompactPipelineConverter {
             sink.getDom(),
             sink.getAppId(),
             sink.getConnectedTo(),
-            getConfig(sink.getStaticProperties())))
+            getConfig(sink.getStaticProperties()),
+            null))
         .forEach(pipelineElements::add);
 
     return pipelineElements;
@@ -66,11 +76,12 @@ public class CompactPipelineConverter {
                                                String ref,
                                                String elementId,
                                                List<String> connectedTo,
-                                               List<Map<String, Object>> 
config) {
+                                               List<Map<String, Object>> 
config,
+                                               OutputConfiguration 
outputConfiguration) {
     var connections = connectedTo != null ? connectedTo.stream()
         .map(this::replaceId)
         .toList() : null;
-    return new CompactPipelineElement(type, replaceId(ref), elementId, 
connections, config);
+    return new CompactPipelineElement(type, replaceId(ref), elementId, 
connections, config, outputConfiguration);
   }
 
   public List<Map<String, Object>> getConfig(List<StaticProperty> 
staticProperties) {
@@ -79,6 +90,23 @@ public class CompactPipelineConverter {
     return configs;
   }
 
+  public OutputConfiguration getOutput(OutputStrategy outputStrategy) {
+    if (outputStrategy instanceof CustomOutputStrategy) {
+      return new OutputConfiguration(((CustomOutputStrategy) 
outputStrategy).getSelectedPropertyKeys(), null);
+    } else if (outputStrategy instanceof UserDefinedOutputStrategy) {
+      return new OutputConfiguration(null, 
toCustomConfig(((UserDefinedOutputStrategy) 
outputStrategy).getEventProperties()));
+    } else {
+      return null;
+    }
+  }
+
+  private List<UserDefinedOutput> toCustomConfig(List<EventProperty> 
eventProperties) {
+    return eventProperties.stream().map(ep -> new UserDefinedOutput(
+        ep.getRuntimeName(),
+        ((EventPropertyPrimitive) ep).getRuntimeType(),
+        ep.getSemanticType())).toList();
+  }
+
   private String replaceId(String id) {
     return id.replaceAll(InvocablePipelineElementGenerator.ID_PREFIX, "");
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
index c736293c20..89f7fd10dd 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
@@ -35,7 +35,13 @@ public class DataProcessorPipelineElementGenerator {
                                           CompactPipelineElement 
pipelineElement) {
     basicGenerator.apply(processor, pipelineElement);
     var template = basicGenerator.makeTemplate(processor, pipelineElement);
-    return new DataProcessorTemplateHandler(template, processor, false)
+    var element = new DataProcessorTemplateHandler(template, processor, false)
         .applyTemplateOnPipelineElement();
+
+    if (pipelineElement.output() != null) {
+      var outputStrategyGenerator = new 
OutputStrategyGenerator(pipelineElement.output());
+      element.getOutputStrategies().forEach(o -> 
o.accept(outputStrategyGenerator));
+    }
+    return element;
   }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
new file mode 100644
index 0000000000..2cb4cd6067
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/OutputStrategyGenerator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.output.AppendOutputStrategy;
+import org.apache.streampipes.model.output.CustomOutputStrategy;
+import org.apache.streampipes.model.output.CustomTransformOutputStrategy;
+import org.apache.streampipes.model.output.FixedOutputStrategy;
+import org.apache.streampipes.model.output.KeepOutputStrategy;
+import org.apache.streampipes.model.output.ListOutputStrategy;
+import org.apache.streampipes.model.output.OutputStrategyVisitor;
+import org.apache.streampipes.model.output.TransformOutputStrategy;
+import org.apache.streampipes.model.output.UserDefinedOutputStrategy;
+import org.apache.streampipes.model.pipeline.compact.OutputConfiguration;
+import org.apache.streampipes.model.pipeline.compact.UserDefinedOutput;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+
+import java.util.List;
+
+public class OutputStrategyGenerator implements OutputStrategyVisitor {
+
+  private final OutputConfiguration config;
+
+  public OutputStrategyGenerator(OutputConfiguration outputConfiguration) {
+    this.config = outputConfiguration;
+  }
+
+  @Override
+  public void visit(AppendOutputStrategy appendOutputStrategy) {
+
+  }
+
+  @Override
+  public void visit(CustomOutputStrategy customOutputStrategy) {
+    var keepConfig = config.keep();
+    if (keepConfig != null && !keepConfig.isEmpty()) {
+      customOutputStrategy.setSelectedPropertyKeys(keepConfig);
+    }
+  }
+
+  @Override
+  public void visit(CustomTransformOutputStrategy 
customTransformOutputStrategy) {
+
+  }
+
+  @Override
+  public void visit(FixedOutputStrategy fixedOutputStrategy) {
+
+  }
+
+  @Override
+  public void visit(KeepOutputStrategy keepOutputStrategy) {
+
+  }
+
+  @Override
+  public void visit(ListOutputStrategy listOutputStrategy) {
+
+  }
+
+  @Override
+  public void visit(TransformOutputStrategy transformOutputStrategy) {
+
+  }
+
+  @Override
+  public void visit(UserDefinedOutputStrategy userDefinedOutputStrategy) {
+    var userDefinedConfig = config.userDefined();
+    if (userDefinedConfig != null && !userDefinedConfig.isEmpty()) {
+      userDefinedOutputStrategy.setEventProperties(
+          toEp(userDefinedConfig)
+      );
+    }
+  }
+
+  private List<EventProperty> toEp(List<UserDefinedOutput> userDefinedOutput) {
+    return userDefinedOutput
+        .stream()
+        .map(this::toPrimitive)
+        .toList();
+  }
+
+  private EventProperty toPrimitive(UserDefinedOutput u) {
+    var ep = new EventPropertyPrimitive();
+    ep.setRuntimeName(u.fieldName());
+    ep.setSemanticType(u.semanticType());
+    ep.setRuntimeType(u.runtimeType());
+
+    return ep;
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
index 0bf308cfc4..1238a0aeda 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/compact/CompactPipelineTemplateManagement.java
@@ -57,7 +57,8 @@ public class CompactPipelineTemplateManagement {
             key,
             stream.getElementId(),
             List.of(),
-            List.of()
+            List.of(),
+            null
         ));
       });
     }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
index caa6e7bf94..00c768f4a7 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/template/instances/PersistDataLakePipelineTemplate.java
@@ -52,7 +52,8 @@ public class PersistDataLakePipelineTemplate implements 
DefaultPipelineTemplateP
                 List.of(
                     Map.of("schema_update", "Update schema"),
                     Map.of("ignore_duplicates", false)
-                )
+                ),
+                null
             )
         )
     );
diff --git a/ui/src/app/core-ui/help/help.component.html 
b/ui/src/app/core-ui/help/help.component.html
index ecc82fc6a8..fcdfbe7332 100644
--- a/ui/src/app/core-ui/help/help.component.html
+++ b/ui/src/app/core-ui/help/help.component.html
@@ -17,18 +17,28 @@
   -->
 
 <div class="sp-dialog-container">
-    <div class="sp-dialog-content p-15">
-        <h4>{{ pipelineElement.name }}</h4>
-        <p>
-            {{ pipelineElement.description }}
-        </p>
-
+    <div class="sp-dialog-content p-15" fxLayout="column">
+        <div fxLayout="row" fxFlex="100">
+            <div fxLayout="column" fxFlex>
+                <h4>{{ pipelineElement.name }}</h4>
+                <small>
+                    {{ pipelineElement.description }}
+                </small>
+            </div>
+            <div class="element-id" fxLayoutAlign="end start">
+                <span>ID</span>&nbsp;<b>{{
+                    isDataStream
+                        ? pipelineElement.elementId
+                        : pipelineElement.appId
+                }}</b>
+            </div>
+        </div>
         <mat-tab-group
             color="accent"
             [selectedIndex]="selectedTabIndex"
             (selectedIndexChange)="selectedTabIndex = $event"
         >
-            <mat-tab *ngFor="let tab of tabs" label="{{ tab }}"> </mat-tab>
+            <mat-tab *ngFor="let tab of tabs" label="{{ tab }}"></mat-tab>
         </mat-tab-group>
 
         <sp-pipeline-element-runtime-info
diff --git a/ui/src/app/core-ui/help/help.component.scss 
b/ui/src/app/core-ui/help/help.component.scss
index ecf2724830..c077f68f35 100644
--- a/ui/src/app/core-ui/help/help.component.scss
+++ b/ui/src/app/core-ui/help/help.component.scss
@@ -17,3 +17,13 @@
  */
 
 @import '../../../scss/sp/sp-dialog.scss';
+
+.element-id {
+    border-radius: 5px;
+    margin-right: 10px;
+    margin-top: 5px;
+    margin-bottom: 5px;
+    font-size: small;
+    display: inline-block;
+    padding: 5px;
+}
diff --git a/ui/src/app/core-ui/help/help.component.ts 
b/ui/src/app/core-ui/help/help.component.ts
index 517a2c927d..fcfc5b8d2d 100644
--- a/ui/src/app/core-ui/help/help.component.ts
+++ b/ui/src/app/core-ui/help/help.component.ts
@@ -34,12 +34,14 @@ export class HelpComponent implements OnInit {
 
     @Input()
     pipelineElement: PipelineElementUnion;
+    isDataStream: boolean;
 
     constructor(private dialogRef: DialogRef<HelpComponent>) {}
 
     ngOnInit() {
         if (this.pipelineElement instanceof SpDataStream) {
             this.tabs = this.availableTabs;
+            this.isDataStream = true;
         } else {
             this.tabs.push(this.availableTabs[1]);
             this.selectedTabIndex = 1;
@@ -51,4 +53,6 @@ export class HelpComponent implements OnInit {
             this.dialogRef.close();
         });
     }
+
+    protected readonly SpDataStream = SpDataStream;
 }

Reply via email to