This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 
4097-modify-script-signature-in-connect-transformation-scripts
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/4097-modify-script-signature-in-connect-transformation-scripts by 
this push:
     new 3b38bf0ef3 feat: Modify script signature
3b38bf0ef3 is described below

commit 3b38bf0ef3b08b705d26b0ec2c27ee4a1a703976
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 15 15:24:17 2026 +0100

    feat: Modify script signature
---
 .../management/AdapterEventPreviewPipeline.java    |  2 +-
 .../management/management/GuessManagement.java     |  8 ++---
 .../shared/AdapterPipelineGeneratorBase.java       | 13 +-------
 .../ScriptTransformationPipelineElement.java       | 13 ++++----
 .../api/{ScriptTransformer.java => Context.java}   | 20 +------------
 ...ScriptTransformer.java => OutputCollector.java} | 19 +++---------
 .../connect/transformer/api/ScriptTransformer.java | 10 ++++---
 .../utils/TransformationEngineConversionUtils.java | 12 ++++++++
 .../transformer/groovy/GroovyScriptEngine.java     | 16 ++++++----
 .../transformer/js/GraalJsScriptEngine.java        | 16 ++++++----
 .../transformer/js/PolyglotResultConverter.java    | 16 ++++++++++
 .../connect/adapter/AdapterPipelineGenerator.java  |  8 ++---
 .../adapter/model/pipeline/AdapterPipeline.java    | 35 +++++++++++++++-------
 .../v099/connect/TransformationScriptBuilder.java  | 12 ++++----
 14 files changed, 107 insertions(+), 93 deletions(-)

diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
index 5fe9375c7c..6a1e16a3c6 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
@@ -37,7 +37,7 @@ public class AdapterEventPreviewPipeline implements 
IAdapterPipeline {
 
 
     this.pipelineElements = new AdapterPipelineGeneratorBase()
-        .makeAdapterPipelineElements(false, adapterDescription, false);
+        .makeAdapterPipelineElements(false, adapterDescription);
 
     this.event = 
adapterDescription.getTransformationConfig().getOutputs().get(0);
   }
diff --git 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
index fe5fa56b33..62adce519e 100644
--- 
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
+++ 
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -108,8 +109,6 @@ public class GuessManagement {
     } else {
 
       try {
-
-
         var transformationScript = 
adapterDescription.getTransformationConfig();
         var engine = 
TransformationEngines.INSTANCE.getTransformationEngine(transformationScript.getLanguage());
         var compiledScript = engine.compile(transformationScript.getScript());
@@ -117,10 +116,11 @@ public class GuessManagement {
         var samples = adapterDescription.getTransformationConfig()
                                         .getInputs();
         if (!samples.isEmpty()) {
-          var result = compiledScript.transform(samples.get(0));
+          List<Map<String, Object>> results = new ArrayList<>();
+          compiledScript.transform(samples.get(0), results::add, null);
 
           adapterDescription.getTransformationConfig()
-                            .setOutputs(List.of(result));
+                            .setOutputs(results);
         } else {
           throw new AdapterException("No samples available to transform");
         }
diff --git 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
index e0c5d710dd..9c89c7ce8b 100644
--- 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
+++ 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.connect.shared;
 
 import 
org.apache.streampipes.connect.shared.preprocessing.elements.AdapterTransformationPipelineElement;
-import 
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
 import 
org.apache.streampipes.connect.shared.preprocessing.transform.stream.EventRateTransformationRule;
 import 
org.apache.streampipes.connect.shared.preprocessing.transform.stream.RemoveDuplicatesTransformationRule;
 import 
org.apache.streampipes.connect.shared.preprocessing.transform.value.DatatypeTransformationRule;
@@ -37,20 +36,10 @@ public class AdapterPipelineGeneratorBase {
 
   public List<IAdapterPipelineElement> makeAdapterPipelineElements(
       boolean includeStateful,
-      AdapterDescription adapterDescription,
-      boolean includeScript
+      AdapterDescription adapterDescription
   ) {
     var elements = new ArrayList<IAdapterPipelineElement>();
 
-    if (includeScript) {
-      elements.add(new ScriptTransformationPipelineElement(
-          adapterDescription.getTransformationConfig()
-                            .getLanguage(),
-          adapterDescription.getTransformationConfig()
-                            .getScript()
-      ));
-    }
-
     List<TransformationRule> transformationRules = new ArrayList<>();
     if (includeStateful) {
       
transformationRules.addAll(getReduceEventTransformationRule(adapterDescription));
diff --git 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
index a00806aed0..3024d38019 100644
--- 
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
+++ 
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
@@ -22,11 +22,12 @@ import 
org.apache.streampipes.connect.transformer.api.ScriptTransformer;
 import org.apache.streampipes.connect.transformer.api.TransformationEngines;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
-public class ScriptTransformationPipelineElement implements 
IAdapterPipelineElement {
+public class ScriptTransformationPipelineElement {
   ScriptTransformer scriptTransformer;
 
   public ScriptTransformationPipelineElement(String language, String 
transformationScript) {
@@ -38,11 +39,11 @@ public class ScriptTransformationPipelineElement implements 
IAdapterPipelineElem
     }
   }
 
-
-  @Override
-  public Map<String, Object> process(Map<String, Object> event) {
+  public List<Map<String, Object>> process(Map<String, Object> event) {
     try {
-      return scriptTransformer.transform(event);
+      List<Map<String, Object>> out = new ArrayList<>();
+      scriptTransformer.transform(event, out::add, null);
+      return out;
     } catch (ScriptExecutionException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
similarity index 56%
copy from 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
index d06885fb5d..4ab88aafa8 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
@@ -18,23 +18,5 @@
 
 package org.apache.streampipes.connect.transformer.api;
 
-import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-
-import java.util.Map;
-
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
-
-  /**
-   * Apply the compiled template to the incoming data.
-   *
-   * @param input input event map keyed by runtime name
-   * @return output event map keyed by runtime name
-   * @throws ScriptExecutionException
-   * utionException when execution fails or returns an invalid result
-   */
-  Map<String, Object> transform(Map<String, Object> input) throws 
ScriptExecutionException;
+public interface Context {
 }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
similarity index 62%
copy from 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to 
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
index d06885fb5d..34a5b7f8e1 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
@@ -20,21 +20,10 @@ package org.apache.streampipes.connect.transformer.api;
 
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 
-import java.util.Map;
+@FunctionalInterface
+public interface OutputCollector<T> {
+
+  void collect(T event) throws ScriptExecutionException;
 
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
 
-  /**
-   * Apply the compiled template to the incoming data.
-   *
-   * @param input input event map keyed by runtime name
-   * @return output event map keyed by runtime name
-   * @throws ScriptExecutionException
-   * utionException when execution fails or returns an invalid result
-   */
-  Map<String, Object> transform(Map<String, Object> input) throws 
ScriptExecutionException;
 }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
index d06885fb5d..97d7f3ad27 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
@@ -32,9 +32,11 @@ public interface ScriptTransformer {
    * Apply the compiled template to the incoming data.
    *
    * @param input input event map keyed by runtime name
-   * @return output event map keyed by runtime name
-   * @throws ScriptExecutionException
-   * utionException when execution fails or returns an invalid result
+   * @param out output event
+   * @param ctx reserved for later, currently null
+   * @throws ScriptExecutionException Exception when execution fails or 
returns an invalid result
    */
-  Map<String, Object> transform(Map<String, Object> input) throws 
ScriptExecutionException;
+  void transform(Map<String, Object> input,
+                 OutputCollector<Map<String, Object>> out,
+                 Context ctx) throws ScriptExecutionException;
 }
diff --git 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
index 066bfcb98a..dbbfd99477 100644
--- 
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
+++ 
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.transformer.api.utils;
 
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 
 import java.util.HashMap;
@@ -44,4 +45,15 @@ public class TransformationEngineConversionUtils {
     rawMap.forEach((k, v) -> result.put(String.valueOf(k), v));
     return result;
   }
+
+  public static OutputCollector<Object> 
convertingCollector(OutputCollector<Map<String, Object>> delegate,
+                                                            String language) {
+    return eventObj -> {
+      Map<String, Object> map = ensureMap(
+          eventObj,
+          language
+      );
+      delegate.collect(map);
+    };
+  }
 }
diff --git 
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
 
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
index a217b8b3aa..6b20789f7c 100644
--- 
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
+++ 
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.connect.transformer.groovy;
 
+import org.apache.streampipes.connect.transformer.api.Context;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
 import org.apache.streampipes.connect.transformer.api.TransformationEngine;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -40,7 +42,7 @@ public class GroovyScriptEngine implements 
TransformationEngine {
     return new ScriptMetadata(
         "groovy",
     "Groovy",
-    ""
+    "out.collect(input)"
     );
   }
 
@@ -56,17 +58,21 @@ public class GroovyScriptEngine implements 
TransformationEngine {
 
     Class<? extends Script> scriptClass = script.getClass();
 
-    return input -> execute(scriptClass, input);
+    return (input, out, ctx) -> execute(scriptClass, input, out, ctx);
   }
 
-  private Map<String, Object> execute(Class<? extends Script> scriptClass, 
Map<String, Object> input)
+  private void execute(Class<? extends Script> scriptClass,
+                       Map<String, Object> input,
+                       OutputCollector<Map<String, Object>> out,
+                       Context ctx)
       throws ScriptExecutionException {
     try {
       Binding binding = new Binding();
       binding.setVariable("input", input);
+      binding.setVariable("out", 
TransformationEngineConversionUtils.convertingCollector(out, 
metadata().language()));
+      binding.setVariable("ctx", ctx);
       Script scriptInstance = InvokerHelper.createScript(scriptClass, binding);
-      Object result = scriptInstance.run();
-      return TransformationEngineConversionUtils.ensureMap(result, 
metadata().language());
+      scriptInstance.run();
     } catch (Exception e) {
       throw new ScriptExecutionException("Groovy template execution failed", 
e);
     }
diff --git 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
index 6cb9b267a7..2b0455d692 100644
--- 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
+++ 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.transformer.js;
 
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
 import org.apache.streampipes.connect.transformer.api.TransformationEngine;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -41,8 +42,8 @@ public class GraalJsScriptEngine implements 
TransformationEngine {
         "JavaScript",
         """
             // returns the same event
-            function transform(event) {
-              return event;
+            function transform(event, out, ctx) {
+              out.collect(event);
             }
             """
     );
@@ -61,7 +62,7 @@ public class GraalJsScriptEngine implements 
TransformationEngine {
 
     Value transformFunction = resolveFunction(context, compiled);
 
-    return input -> execute(transformFunction, input);
+    return (input, out, ctx) -> execute(transformFunction, input, out, ctx);
   }
 
   private Context createContext() {
@@ -89,11 +90,14 @@ public class GraalJsScriptEngine implements 
TransformationEngine {
         null);
   }
 
-  private Map<String, Object> execute(Value transformFunction, Map<String, 
Object> input)
+  private void execute(Value transformFunction,
+                       Map<String, Object> input,
+                       OutputCollector<Map<String, Object>> out,
+                       org.apache.streampipes.connect.transformer.api.Context 
ctx)
       throws ScriptExecutionException {
     try {
-      Value result = transformFunction.execute(input);
-      return PolyglotResultConverter.ensureMap(result, metadata().language());
+      transformFunction.execute(input, 
PolyglotResultConverter.convertingCollector(out, metadata().language()), ctx);
+      //return PolyglotResultConverter.ensureMap(result, 
metadata().language());
     } catch (PolyglotException e) {
       throw new ScriptExecutionException("Graal JS script execution failed", 
e);
     }
diff --git 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
index b280ae247d..9f85ef3c41 100644
--- 
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
+++ 
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.transformer.js;
 
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
 import 
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
 import 
org.apache.streampipes.connect.transformer.api.utils.TransformationEngineConversionUtils;
 
@@ -69,4 +70,19 @@ public class PolyglotResultConverter {
     throw new ScriptExecutionException(
         "Template in " + language + " returned a non-object value: " + 
value.toString());
   }
+
+  public static OutputCollector<Object> 
convertingCollector(OutputCollector<Map<String, Object>> delegate, String 
language) {
+    return obj -> {
+      Map<String,Object> map;
+      if (obj instanceof Map<?, ?> m) {
+        //noinspection unchecked
+        map = (Map<String,Object>) m;
+      } else if (obj instanceof Value v) {
+        map = PolyglotResultConverter.ensureMap(v, language);
+      } else {
+        throw new IllegalArgumentException("Collected event must be an 
object/map");
+      }
+      delegate.collect(map);
+    };
+  }
 }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index 6603bcb7b9..0309814a69 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -26,18 +26,16 @@ import 
org.apache.streampipes.model.connect.adapter.AdapterDescription;
 public class AdapterPipelineGenerator extends AdapterPipelineGeneratorBase {
 
   public AdapterPipeline generatePipeline(AdapterDescription 
adapterDescription) {
-
-    var includeScript = 
adapterDescription.getTransformationConfig().isScriptActive();
-
-    var pipelineElements = makeAdapterPipelineElements(true, 
adapterDescription, includeScript);
+    var pipelineElements = makeAdapterPipelineElements(true, 
adapterDescription);
 
     if (hasValidGrounding(adapterDescription)) {
       return new AdapterPipeline(
           pipelineElements,
+          adapterDescription.getTransformationConfig(),
           getAdapterSink(adapterDescription),
           adapterDescription.getEventSchema());
     } else {
-      return new AdapterPipeline(pipelineElements, 
adapterDescription.getEventSchema());
+      return new AdapterPipeline(pipelineElements, 
adapterDescription.getTransformationConfig(), 
adapterDescription.getEventSchema());
     }
   }
 
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index b703b94b3c..43ee703c26 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -18,12 +18,15 @@
 
 package 
org.apache.streampipes.extensions.management.connect.adapter.model.pipeline;
 
+import 
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
 import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import org.apache.streampipes.model.connect.TransformationConfig;
 import org.apache.streampipes.model.schema.EventSchema;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class AdapterPipeline implements IAdapterPipeline {
 
@@ -31,31 +34,43 @@ public class AdapterPipeline implements IAdapterPipeline {
   private IAdapterPipelineElement pipelineSink;
 
   private final EventSchema resultingEventSchema;
+  private final Function<Map<String, Object>, List<Map<String, Object>>> 
processingFn;
 
   public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+                         TransformationConfig transformationConfig,
                          EventSchema resultingEventSchema) {
     this.pipelineElements = pipelineElements;
     this.resultingEventSchema = resultingEventSchema;
+    if (transformationConfig.isScriptActive()) {
+      var transformation = new ScriptTransformationPipelineElement(
+         transformationConfig.getLanguage(),
+          transformationConfig.getScript()
+      );
+      processingFn = transformation::process;
+    } else {
+      processingFn = List::of;
+    }
   }
 
   public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+                         TransformationConfig transformationConfig,
                          IAdapterPipelineElement pipelineSink,
                          EventSchema resultingEventSchema) {
-    this.pipelineElements = pipelineElements;
+    this(pipelineElements, transformationConfig, resultingEventSchema);
     this.pipelineSink = pipelineSink;
-    this.resultingEventSchema = resultingEventSchema;
   }
 
   @Override
   public void process(Map<String, Object> event) {
-
-    for (IAdapterPipelineElement pipelineElement : pipelineElements) {
-      event = pipelineElement.process(event);
-    }
-    if (pipelineSink != null) {
-      pipelineSink.process(event);
-    }
-
+    var scriptResult = this.processingFn.apply(event);
+    scriptResult.forEach(result -> {
+      for (IAdapterPipelineElement pipelineElement : pipelineElements) {
+        result = pipelineElement.process(result);
+      }
+      if (pipelineSink != null) {
+        pipelineSink.process(result);
+      }
+    });
   }
 
   @Override
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
index f068ef44c6..da7369f3f9 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
@@ -21,22 +21,22 @@ package 
org.apache.streampipes.service.core.migrations.v099.connect;
 public class TransformationScriptBuilder {
   private StringBuilder sb;
 
-  private boolean scriptAcive;
+  private boolean scriptActive;
 
   private TransformationScriptBuilder() {
-    scriptAcive = false;
+    scriptActive = false;
   }
 
   public static TransformationScriptBuilder create() {
     TransformationScriptBuilder builder = new TransformationScriptBuilder();
     builder.sb = new StringBuilder();
-    builder.sb.append("function transform(event) {\n");
+    builder.sb.append("function transform(event, out, ctx) {\n");
     return builder;
   }
 
   public TransformationScriptBuilder appendLine(String line) {
     if (!line.startsWith("//")) {
-      scriptAcive = true;
+      scriptActive = true;
     }
 
     sb.append("  ").append(line).append("\n");
@@ -45,11 +45,11 @@ public class TransformationScriptBuilder {
 
   // Used to check if any script lines were added
   public boolean isScriptActive() {
-    return scriptAcive;
+    return scriptActive;
   }
 
   public String build() {
-    sb.append("  return event;\n");
+    sb.append("  out.collect(event);\n");
     sb.append("}");
     return sb.toString();
   }

Reply via email to