This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 687c5022c3 feat: Modify script signature (#4098)
687c5022c3 is described below

commit 687c5022c3bbda0c22b7985f84a10153375c86bc
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 15 20:36:02 2026 +0100

    feat: Modify script signature (#4098)
    
    Co-authored-by: Philipp Zehnder <[email protected]>
---
 .../go-client-e2e/adapter/machine.json             |  2 +-
 .../management/AdapterEventPreviewPipeline.java    |  2 +-
 .../compact/generator/AdapterSchemaGenerator.java  |  7 +++--
 .../management/management/GuessManagement.java     |  8 ++---
 .../shared/AdapterPipelineGeneratorBase.java       | 13 +-------
 .../ScriptTransformationPipelineElement.java       | 13 ++++----
 .../api/{ScriptTransformer.java => Context.java}   | 20 +------------
 ...ScriptTransformer.java => OutputCollector.java} | 19 +++---------
 .../connect/transformer/api/ScriptTransformer.java | 10 ++++---
 .../utils/TransformationEngineConversionUtils.java | 12 ++++++++
 .../transformer/groovy/GroovyScriptEngine.java     | 16 ++++++----
 .../transformer/js/GraalJsScriptEngine.java        | 15 ++++++----
 .../transformer/js/PolyglotResultConverter.java    | 16 ++++++++++
 .../connect/adapter/AdapterPipelineGenerator.java  |  8 ++---
 .../adapter/model/pipeline/AdapterPipeline.java    | 35 +++++++++++++++-------
 .../v099/connect/TransformationScriptBuilder.java  | 12 ++++----
 .../v099/MigrateAdaptersToUseScriptTest.java       |  4 +--
 .../support/utils/connect/CompactAdapterUtils.ts   |  4 ++-
 .../utils/connect/ConnectEventSchemaUtils.ts       |  4 +--
 ui/cypress/support/utils/connect/ConnectUtils.ts   |  8 ++++-
 .../compactAdapterWithTransformation.spec.ts       |  4 +--
 ui/cypress/tests/connect/editAdapter.smoke.spec.ts |  4 +--
 ui/cypress/tests/connect/fileStream.spec.ts        |  2 +-
 .../connect/rules/deleteTransformationRule.spec.ts |  2 +-
 .../tests/connect/rules/schemaRules.smoke.spec.ts  |  2 +-
 ui/cypress/tests/connect/rules/valueRules.spec.ts  |  2 +-
 ui/cypress/tests/connect/scriptTemplate.spec.ts    |  2 +-
 27 files changed, 134 insertions(+), 112 deletions(-)

diff --git a/streampipes-client-e2e/go-client-e2e/adapter/machine.json 
b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
index d3272ef73a..57f9645dd8 100644
--- a/streampipes-client-e2e/go-client-e2e/adapter/machine.json
+++ b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
@@ -220,7 +220,7 @@
   "correspondingDataStreamElementId" : "sp:spdatastream:wMyUZS",
   "schemaTransformationConfig" : {
     "language" : "javascript",
-    "script" : "// returns the same event\nfunction transform(event) {\n  
return event;\n}",
+    "script" : "function transform(event, out, ctx) {\n  
out.collect(event);\n}\n",
     "inputs" : [ {
       "mass_flow" : 0.0620350632211194,
       "density" : 46.62523604047736,
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
index 5fe9375c7c..6a1e16a3c6 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
@@ -37,7 +37,7 @@ public class AdapterEventPreviewPipeline implements 
IAdapterPipeline {
 
 
     this.pipelineElements = new AdapterPipelineGeneratorBase()
-        .makeAdapterPipelineElements(false, adapterDescription, false);
+        .makeAdapterPipelineElements(false, adapterDescription);
 
     this.event = 
adapterDescription.getTransformationConfig().getOutputs().get(0);
   }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
index 4333680db4..1b0d1bc822 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
@@ -91,11 +91,12 @@ public class AdapterSchemaGenerator implements 
AdapterModelGenerator {
         || adapterDescription.getTransformationConfig()
                              .getScript()
                              .isEmpty()) {
+      adapterDescription.getTransformationConfig().setScriptActive(true);
       adapterDescription.getTransformationConfig()
                         .setScript("""
-                                   function transform(event) {
-                                     return event;
-                                   }
+                                   function transform(event, out, ctx) {
+                                      out.collect(event);
+                                    }
                                    """);
 
     }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
index fe5fa56b33..62adce519e 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -108,8 +109,6 @@ public class GuessManagement {
     } else {
 
       try {
-
-
         var transformationScript = 
adapterDescription.getTransformationConfig();
         var engine = 
TransformationEngines.INSTANCE.getTransformationEngine(transformationScript.getLanguage());
         var compiledScript = engine.compile(transformationScript.getScript());
@@ -117,10 +116,11 @@ public class GuessManagement {
         var samples = adapterDescription.getTransformationConfig()
                                         .getInputs();
         if (!samples.isEmpty()) {
-          var result = compiledScript.transform(samples.get(0));
+          List<Map<String, Object>> results = new ArrayList<>();
+          compiledScript.transform(samples.get(0), results::add, null);
 
           adapterDescription.getTransformationConfig()
-                            .setOutputs(List.of(result));
+                            .setOutputs(results);
         } else {
           throw new AdapterException("No samples available to transform");
         }
diff --git 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
index e0c5d710dd..9c89c7ce8b 100644
--- 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
+++ 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.connect.shared;
 
 import 
org.apache.streampipes.connect.shared.preprocessing.elements.AdapterTransformationPipelineElement;
-import 
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
 import 
org.apache.streampipes.connect.shared.preprocessing.transform.stream.EventRateTransformationRule;
 import 
org.apache.streampipes.connect.shared.preprocessing.transform.stream.RemoveDuplicatesTransformationRule;
 import 
org.apache.streampipes.connect.shared.preprocessing.transform.value.DatatypeTransformationRule;
@@ -37,20 +36,10 @@ public class AdapterPipelineGeneratorBase {
 
   public List<IAdapterPipelineElement> makeAdapterPipelineElements(
       boolean includeStateful,
-      AdapterDescription adapterDescription,
-      boolean includeScript
+      AdapterDescription adapterDescription
   ) {
     var elements = new ArrayList<IAdapterPipelineElement>();
 
-    if (includeScript) {
-      elements.add(new ScriptTransformationPipelineElement(
-          adapterDescription.getTransformationConfig()
-                            .getLanguage(),
-          adapterDescription.getTransformationConfig()
-                            .getScript()
-      ));
-    }
-
     List<TransformationRule> transformationRules = new ArrayList<>();
     if (includeStateful) {
       
transformationRules.addAll(getReduceEventTransformationRule(adapterDescription));
diff --git 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
index a00806aed0..3024d38019 100644
--- 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
+++ 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
@@ -22,11 +22,12 @@ import 
org.apache.streampipes.connect.transformer.api.ScriptTransformer;
 import org.apache.streampipes.connect.transformer.api.TransformationEngines;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
-public class ScriptTransformationPipelineElement implements 
IAdapterPipelineElement {
+public class ScriptTransformationPipelineElement {
   ScriptTransformer scriptTransformer;
 
   public ScriptTransformationPipelineElement(String language, String 
transformationScript) {
@@ -38,11 +39,11 @@ public class ScriptTransformationPipelineElement implements 
IAdapterPipelineElem
     }
   }
 
-
-  @Override
-  public Map<String, Object> process(Map<String, Object> event) {
+  public List<Map<String, Object>> process(Map<String, Object> event) {
     try {
-      return scriptTransformer.transform(event);
+      List<Map<String, Object>> out = new ArrayList<>();
+      scriptTransformer.transform(event, out::add, null);
+      return out;
     } catch (ScriptExecutionException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
similarity index 56%
copy from 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
index d06885fb5d..4ab88aafa8 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
@@ -18,23 +18,5 @@
 
 package org.apache.streampipes.connect.transformer.api;
 
-import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-
-import java.util.Map;
-
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
-
-  /**
-   * Apply the compiled template to the incoming data.
-   *
-   * @param input input event map keyed by runtime name
-   * @return output event map keyed by runtime name
-   * @throws ScriptExecutionException
-   * utionException when execution fails or returns an invalid result
-   */
-  Map<String, Object> transform(Map<String, Object> input) throws 
ScriptExecutionException;
+public interface Context {
 }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
similarity index 62%
copy from 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
index d06885fb5d..34a5b7f8e1 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
@@ -20,21 +20,10 @@ package org.apache.streampipes.connect.transformer.api;
 
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 
-import java.util.Map;
+@FunctionalInterface
+public interface OutputCollector<T> {
+
+  void collect(T event) throws ScriptExecutionException;
 
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
 
-  /**
-   * Apply the compiled template to the incoming data.
-   *
-   * @param input input event map keyed by runtime name
-   * @return output event map keyed by runtime name
-   * @throws ScriptExecutionException
-   * utionException when execution fails or returns an invalid result
-   */
-  Map<String, Object> transform(Map<String, Object> input) throws 
ScriptExecutionException;
 }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
index d06885fb5d..97d7f3ad27 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
@@ -32,9 +32,11 @@ public interface ScriptTransformer {
    * Apply the compiled template to the incoming data.
    *
    * @param input input event map keyed by runtime name
-   * @return output event map keyed by runtime name
-   * @throws ScriptExecutionException
-   * utionException when execution fails or returns an invalid result
+   * @param out output event
+   * @param ctx reserved for later, currently null
+   * @throws ScriptExecutionException Exception when execution fails or 
returns an invalid result
    */
-  Map<String, Object> transform(Map<String, Object> input) throws 
ScriptExecutionException;
+  void transform(Map<String, Object> input,
+                 OutputCollector<Map<String, Object>> out,
+                 Context ctx) throws ScriptExecutionException;
 }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
index 066bfcb98a..dbbfd99477 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.transformer.api.utils;
 
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 
 import java.util.HashMap;
@@ -44,4 +45,15 @@ public class TransformationEngineConversionUtils {
     rawMap.forEach((k, v) -> result.put(String.valueOf(k), v));
     return result;
   }
+
+  public static OutputCollector<Object> 
convertingCollector(OutputCollector<Map<String, Object>> delegate,
+                                                            String language) {
+    return eventObj -> {
+      Map<String, Object> map = ensureMap(
+          eventObj,
+          language
+      );
+      delegate.collect(map);
+    };
+  }
 }
diff --git 
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
 
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
index a217b8b3aa..6b20789f7c 100644
--- 
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
+++ 
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.connect.transformer.groovy;
 
+import org.apache.streampipes.connect.transformer.api.Context;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
 import org.apache.streampipes.connect.transformer.api.TransformationEngine;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -40,7 +42,7 @@ public class GroovyScriptEngine implements 
TransformationEngine {
     return new ScriptMetadata(
         "groovy",
     "Groovy",
-    ""
+    "out.collect(input)"
     );
   }
 
@@ -56,17 +58,21 @@ public class GroovyScriptEngine implements 
TransformationEngine {
 
     Class<? extends Script> scriptClass = script.getClass();
 
-    return input -> execute(scriptClass, input);
+    return (input, out, ctx) -> execute(scriptClass, input, out, ctx);
   }
 
-  private Map<String, Object> execute(Class<? extends Script> scriptClass, 
Map<String, Object> input)
+  private void execute(Class<? extends Script> scriptClass,
+                       Map<String, Object> input,
+                       OutputCollector<Map<String, Object>> out,
+                       Context ctx)
       throws ScriptExecutionException {
     try {
       Binding binding = new Binding();
       binding.setVariable("input", input);
+      binding.setVariable("out", 
TransformationEngineConversionUtils.convertingCollector(out, 
metadata().language()));
+      binding.setVariable("ctx", ctx);
       Script scriptInstance = InvokerHelper.createScript(scriptClass, binding);
-      Object result = scriptInstance.run();
-      return TransformationEngineConversionUtils.ensureMap(result, 
metadata().language());
+      scriptInstance.run();
     } catch (Exception e) {
       throw new ScriptExecutionException("Groovy template execution failed", 
e);
     }
diff --git 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
index 6cb9b267a7..a645e314d0 100644
--- 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
+++ 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.transformer.js;
 
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
 import org.apache.streampipes.connect.transformer.api.TransformationEngine;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -41,8 +42,8 @@ public class GraalJsScriptEngine implements 
TransformationEngine {
         "JavaScript",
         """
             // returns the same event
-            function transform(event) {
-              return event;
+            function transform(event, out, ctx) {
+              out.collect(event);
             }
             """
     );
@@ -61,7 +62,7 @@ public class GraalJsScriptEngine implements 
TransformationEngine {
 
     Value transformFunction = resolveFunction(context, compiled);
 
-    return input -> execute(transformFunction, input);
+    return (input, out, ctx) -> execute(transformFunction, input, out, ctx);
   }
 
   private Context createContext() {
@@ -89,11 +90,13 @@ public class GraalJsScriptEngine implements 
TransformationEngine {
         null);
   }
 
-  private Map<String, Object> execute(Value transformFunction, Map<String, 
Object> input)
+  private void execute(Value transformFunction,
+                       Map<String, Object> input,
+                       OutputCollector<Map<String, Object>> out,
+                       org.apache.streampipes.connect.transformer.api.Context 
ctx)
       throws ScriptExecutionException {
     try {
-      Value result = transformFunction.execute(input);
-      return PolyglotResultConverter.ensureMap(result, metadata().language());
+      transformFunction.execute(input, 
PolyglotResultConverter.convertingCollector(out, metadata().language()), ctx);
     } catch (PolyglotException e) {
       throw new ScriptExecutionException("Graal JS script execution failed", 
e);
     }
diff --git 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
index b280ae247d..9f85ef3c41 100644
--- 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
+++ 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.transformer.js;
 
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 import 
org.apache.streampipes.connect.transformer.api.utils.TransformationEngineConversionUtils;
 
@@ -69,4 +70,19 @@ public class PolyglotResultConverter {
     throw new ScriptExecutionException(
         "Template in " + language + " returned a non-object value: " + 
value.toString());
   }
+
+  public static OutputCollector<Object> 
convertingCollector(OutputCollector<Map<String, Object>> delegate, String 
language) {
+    return obj -> {
+      Map<String,Object> map;
+      if (obj instanceof Map<?, ?> m) {
+        //noinspection unchecked
+        map = (Map<String,Object>) m;
+      } else if (obj instanceof Value v) {
+        map = PolyglotResultConverter.ensureMap(v, language);
+      } else {
+        throw new IllegalArgumentException("Collected event must be an 
object/map");
+      }
+      delegate.collect(map);
+    };
+  }
 }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index 6603bcb7b9..0309814a69 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -26,18 +26,16 @@ import 
org.apache.streampipes.model.connect.adapter.AdapterDescription;
 public class AdapterPipelineGenerator extends AdapterPipelineGeneratorBase {
 
   public AdapterPipeline generatePipeline(AdapterDescription 
adapterDescription) {
-
-    var includeScript = 
adapterDescription.getTransformationConfig().isScriptActive();
-
-    var pipelineElements = makeAdapterPipelineElements(true, 
adapterDescription, includeScript);
+    var pipelineElements = makeAdapterPipelineElements(true, 
adapterDescription);
 
     if (hasValidGrounding(adapterDescription)) {
       return new AdapterPipeline(
           pipelineElements,
+          adapterDescription.getTransformationConfig(),
           getAdapterSink(adapterDescription),
           adapterDescription.getEventSchema());
     } else {
-      return new AdapterPipeline(pipelineElements, 
adapterDescription.getEventSchema());
+      return new AdapterPipeline(pipelineElements, 
adapterDescription.getTransformationConfig(), 
adapterDescription.getEventSchema());
     }
   }
 
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index b703b94b3c..43ee703c26 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -18,12 +18,15 @@
 
 package 
org.apache.streampipes.extensions.management.connect.adapter.model.pipeline;
 
+import 
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import org.apache.streampipes.model.connect.TransformationConfig;
 import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class AdapterPipeline implements IAdapterPipeline {
 
@@ -31,31 +34,43 @@ public class AdapterPipeline implements IAdapterPipeline {
   private IAdapterPipelineElement pipelineSink;
 
   private final EventSchema resultingEventSchema;
+  private final Function<Map<String, Object>, List<Map<String, Object>>> 
processingFn;
 
   public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+                         TransformationConfig transformationConfig,
                          EventSchema resultingEventSchema) {
     this.pipelineElements = pipelineElements;
     this.resultingEventSchema = resultingEventSchema;
+    if (transformationConfig.isScriptActive()) {
+      var transformation = new ScriptTransformationPipelineElement(
+         transformationConfig.getLanguage(),
+          transformationConfig.getScript()
+      );
+      processingFn = transformation::process;
+    } else {
+      processingFn = List::of;
+    }
   }
 
   public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+                         TransformationConfig transformationConfig,
                          IAdapterPipelineElement pipelineSink,
                          EventSchema resultingEventSchema) {
-    this.pipelineElements = pipelineElements;
+    this(pipelineElements, transformationConfig, resultingEventSchema);
     this.pipelineSink = pipelineSink;
-    this.resultingEventSchema = resultingEventSchema;
   }
 
   @Override
   public void process(Map<String, Object> event) {
-
-    for (IAdapterPipelineElement pipelineElement : pipelineElements) {
-      event = pipelineElement.process(event);
-    }
-    if (pipelineSink != null) {
-      pipelineSink.process(event);
-    }
-
+    var scriptResult = this.processingFn.apply(event);
+    scriptResult.forEach(result -> {
+      for (IAdapterPipelineElement pipelineElement : pipelineElements) {
+        result = pipelineElement.process(result);
+      }
+      if (pipelineSink != null) {
+        pipelineSink.process(result);
+      }
+    });
   }
 
   @Override
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
index f068ef44c6..da7369f3f9 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
@@ -21,22 +21,22 @@ package 
org.apache.streampipes.service.core.migrations.v099.connect;
 public class TransformationScriptBuilder {
   private StringBuilder sb;
 
-  private boolean scriptAcive;
+  private boolean scriptActive;
 
   private TransformationScriptBuilder() {
-    scriptAcive = false;
+    scriptActive = false;
   }
 
   public static TransformationScriptBuilder create() {
     TransformationScriptBuilder builder = new TransformationScriptBuilder();
     builder.sb = new StringBuilder();
-    builder.sb.append("function transform(event) {\n");
+    builder.sb.append("function transform(event, out, ctx) {\n");
     return builder;
   }
 
   public TransformationScriptBuilder appendLine(String line) {
     if (!line.startsWith("//")) {
-      scriptAcive = true;
+      scriptActive = true;
     }
 
     sb.append("  ").append(line).append("\n");
@@ -45,11 +45,11 @@ public class TransformationScriptBuilder {
 
   // Used to check if any script lines were added
   public boolean isScriptActive() {
-    return scriptAcive;
+    return scriptActive;
   }
 
   public String build() {
-    sb.append("  return event;\n");
+    sb.append("  out.collect(event);\n");
     sb.append("}");
     return sb.toString();
   }
diff --git 
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
 
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
index 4c963e529e..b12978ec67 100644
--- 
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
+++ 
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
@@ -276,7 +276,7 @@ class MigrateAdaptersToUseScriptTest {
                  "Time window should match legacy config");
 
     // Ensure the script still contains the standard boilerplate even if this 
rule is stateful
-    assertTrue(resultConfig.getScript().contains("function transform(event)"),
+    assertTrue(resultConfig.getScript().contains("function transform(event, 
out, ctx)"),
                "Script should still be generated as a container");
     assertFalse(adapter.getTransformationConfig().isScriptActive());
   }
@@ -395,4 +395,4 @@ class MigrateAdaptersToUseScriptTest {
     adapter.getDataStream().setEventSchema(new EventSchema());
     return adapter;
   }
-}
\ No newline at end of file
+}
diff --git a/ui/cypress/support/utils/connect/CompactAdapterUtils.ts 
b/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
index 53af565d31..6a8c09db70 100644
--- a/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
+++ b/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
@@ -76,7 +76,9 @@ export class CompactAdapterUtils {
         )
             .setName('Test')
             .withScript(
-                'function transform(event) {\n' + '  return event;\n' + '}',
+                'function transform(event, out, ctx) {\n' +
+                    '  out.collect(event);\n' +
+                    '}\n',
             )
             .addConfiguration('wait-time-ms', '1000')
             .addConfiguration('selected-simulator-option', 'flowrate');
diff --git a/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts 
b/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
index 816a069216..1f0a44ddb0 100644
--- a/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
@@ -54,9 +54,9 @@ export class ConnectEventSchemaUtils {
 
     private static addTimestampFieldToScript() {
         ConnectBtns.configureSchemaScriptEditor()
-            .type('{backspace}'.repeat(17)) // 2. Delete the "  return 
event;\n}" part
+            .type('{backspace}'.repeat(22)) // 2. Delete the "  
out.collect(event);\n}" part
             .type(
-                '  event.timestamp = new Date().getTime();{enter}return 
event;{enter}}',
+                '  event.timestamp = new Date().getTime();{enter}return 
out.collect(event);{enter}}',
             );
     }
 
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts 
b/ui/cypress/support/utils/connect/ConnectUtils.ts
index 3666fb8a1b..97f0e0635e 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -398,8 +398,14 @@ export class ConnectUtils {
     }
 
     public static replaceAdapterScript(script: string) {
+        // Ensure that the script is loaded
+        ConnectBtns.configureSchemaScriptEditor().should(
+            'contain.text',
+            'out.collect(event);',
+        );
+
         ConnectBtns.configureSchemaScriptEditor()
-            .type('{backspace}'.repeat(17)) // 2. Delete the "  return 
event;\n}" part
+            .type('{backspace}'.repeat(22)) // 2. Delete the "  
out.collect(event);\n}" part
             .type(script);
     }
 
diff --git 
a/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts 
b/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
index b5f2cb7996..7e25b40112 100644
--- a/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
+++ b/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
@@ -30,10 +30,10 @@ describe('Add Compact Adapters', () => {
         const newPropertyName = 'temperature_renamed';
         const compactAdapter = CompactAdapterUtils.getMachineDataSimulator()
             .withScript(
-                'function transform(event) {\n' +
+                'function transform(event, out, ctx) {\n' +
                     '  event.temperature_renamed = event.temperature \n' +
                     '  delete event.temperature \n' +
-                    '  return event;\n' +
+                    '  out.collect(event);\n' +
                     '}',
             )
             .setStart()
diff --git a/ui/cypress/tests/connect/editAdapter.smoke.spec.ts 
b/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
index 7c0b91f423..0edec29055 100644
--- a/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
+++ b/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
@@ -95,11 +95,11 @@ describe('Test Edit Adapter', () => {
 
         ConnectUtils.replaceAdapterScript(
             '  event.density = event.density * 2;\n' +
-                '  return event;\n' +
+                '  out.collect(event);\n' +
                 '}',
         );
         ConnectBtns.configureSchemaRunScriptBtn().click();
-        cy.wait(500);
+        cy.wait(1000);
 
         ConnectBtns.configureSchemaNextBtn().click();
 
diff --git a/ui/cypress/tests/connect/fileStream.spec.ts 
b/ui/cypress/tests/connect/fileStream.spec.ts
index a11dbf6fca..0dfbe6206d 100644
--- a/ui/cypress/tests/connect/fileStream.spec.ts
+++ b/ui/cypress/tests/connect/fileStream.spec.ts
@@ -51,7 +51,7 @@ describe('Test File Replay Adapter', () => {
             ConnectUtils.setUpPreprocessingRuleTest(false);
 
         ConnectUtils.replaceAdapterScript(
-            'event.timestamp = event.timestamp * 1000;\n return event;\n}',
+            'event.timestamp = event.timestamp * 1000;\n 
out.collect(event);\n}',
         );
 
         ConnectBtns.configureSchemaRunScriptBtn().click();
diff --git a/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts 
b/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
index 72b6329324..7ab7527a75 100644
--- a/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
+++ b/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
@@ -50,7 +50,7 @@ describe('Connect delete rule transformation', () => {
         ConnectUtils.replaceAdapterScript(
             '  delete event.toRemove;\n' +
                 '  delete event.parent.child_two;\n' +
-                '  return event;\n' +
+                '  out.collect(event);\n' +
                 '}',
         );
 
diff --git a/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts 
b/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
index 507292b1fa..5d4925ffaf 100644
--- a/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
+++ b/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
@@ -35,7 +35,7 @@ describe('Connect schema rule transformations', () => {
         ConnectUtils.replaceAdapterScript(
             "  event['dot'] = event ['contains.dot'];\n" +
                 "  delete event['contains.dot'];\n" +
-                '  return event;\n' +
+                '  out.collect(event);\n' +
                 '}',
         );
 
diff --git a/ui/cypress/tests/connect/rules/valueRules.spec.ts 
b/ui/cypress/tests/connect/rules/valueRules.spec.ts
index a03fa8cf1c..48b08e45b0 100644
--- a/ui/cypress/tests/connect/rules/valueRules.spec.ts
+++ b/ui/cypress/tests/connect/rules/valueRules.spec.ts
@@ -32,7 +32,7 @@ describe('Connect value rule transformations', () => {
             ConnectUtils.setUpPreprocessingRuleTest(true);
 
         ConnectUtils.replaceAdapterScript(
-            'event.timestamp = new Date(event.timestamp).getTime();\n return 
event;\n}',
+            'event.timestamp = new Date(event.timestamp).getTime();\n 
out.collect(event);\n}',
         );
         ConnectBtns.configureSchemaRunScriptBtn().click();
         cy.wait(1000);
diff --git a/ui/cypress/tests/connect/scriptTemplate.spec.ts 
b/ui/cypress/tests/connect/scriptTemplate.spec.ts
index fc04073f75..313917d2b4 100644
--- a/ui/cypress/tests/connect/scriptTemplate.spec.ts
+++ b/ui/cypress/tests/connect/scriptTemplate.spec.ts
@@ -22,7 +22,7 @@ import { ConnectBtns } from 
'../../support/utils/connect/ConnectBtns';
 const TEMPLATE_NAME = 'TestTemplate';
 const SCRIPT_LINE = "event.b = 'b';";
 const SCRIPT = `  ${SCRIPT_LINE}
-return event;
+out.collect(event);
 }`;
 
 describe('Validate Warning Pops For Configuration Changes ', () => {


Reply via email to