This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch
4097-modify-script-signature-in-connect-transformation-scripts
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/4097-modify-script-signature-in-connect-transformation-scripts by
this push:
new 3b38bf0ef3 feat: Modify script signature
3b38bf0ef3 is described below
commit 3b38bf0ef3b08b705d26b0ec2c27ee4a1a703976
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 15 15:24:17 2026 +0100
feat: Modify script signature
---
.../management/AdapterEventPreviewPipeline.java | 2 +-
.../management/management/GuessManagement.java | 8 ++---
.../shared/AdapterPipelineGeneratorBase.java | 13 +-------
.../ScriptTransformationPipelineElement.java | 13 ++++----
.../api/{ScriptTransformer.java => Context.java} | 20 +------------
...ScriptTransformer.java => OutputCollector.java} | 19 +++---------
.../connect/transformer/api/ScriptTransformer.java | 10 ++++---
.../utils/TransformationEngineConversionUtils.java | 12 ++++++++
.../transformer/groovy/GroovyScriptEngine.java | 16 ++++++----
.../transformer/js/GraalJsScriptEngine.java | 16 ++++++----
.../transformer/js/PolyglotResultConverter.java | 16 ++++++++++
.../connect/adapter/AdapterPipelineGenerator.java | 8 ++---
.../adapter/model/pipeline/AdapterPipeline.java | 35 +++++++++++++++-------
.../v099/connect/TransformationScriptBuilder.java | 12 ++++----
14 files changed, 107 insertions(+), 93 deletions(-)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
index 5fe9375c7c..6a1e16a3c6 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
@@ -37,7 +37,7 @@ public class AdapterEventPreviewPipeline implements
IAdapterPipeline {
this.pipelineElements = new AdapterPipelineGeneratorBase()
- .makeAdapterPipelineElements(false, adapterDescription, false);
+ .makeAdapterPipelineElements(false, adapterDescription);
this.event =
adapterDescription.getTransformationConfig().getOutputs().get(0);
}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
index fe5fa56b33..62adce519e 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -108,8 +109,6 @@ public class GuessManagement {
} else {
try {
-
-
var transformationScript =
adapterDescription.getTransformationConfig();
var engine =
TransformationEngines.INSTANCE.getTransformationEngine(transformationScript.getLanguage());
var compiledScript = engine.compile(transformationScript.getScript());
@@ -117,10 +116,11 @@ public class GuessManagement {
var samples = adapterDescription.getTransformationConfig()
.getInputs();
if (!samples.isEmpty()) {
- var result = compiledScript.transform(samples.get(0));
+ List<Map<String, Object>> results = new ArrayList<>();
+ compiledScript.transform(samples.get(0), results::add, null);
adapterDescription.getTransformationConfig()
- .setOutputs(List.of(result));
+ .setOutputs(results);
} else {
throw new AdapterException("No samples available to transform");
}
diff --git
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
index e0c5d710dd..9c89c7ce8b 100644
---
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
+++
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.connect.shared;
import
org.apache.streampipes.connect.shared.preprocessing.elements.AdapterTransformationPipelineElement;
-import
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
import
org.apache.streampipes.connect.shared.preprocessing.transform.stream.EventRateTransformationRule;
import
org.apache.streampipes.connect.shared.preprocessing.transform.stream.RemoveDuplicatesTransformationRule;
import
org.apache.streampipes.connect.shared.preprocessing.transform.value.DatatypeTransformationRule;
@@ -37,20 +36,10 @@ public class AdapterPipelineGeneratorBase {
public List<IAdapterPipelineElement> makeAdapterPipelineElements(
boolean includeStateful,
- AdapterDescription adapterDescription,
- boolean includeScript
+ AdapterDescription adapterDescription
) {
var elements = new ArrayList<IAdapterPipelineElement>();
- if (includeScript) {
- elements.add(new ScriptTransformationPipelineElement(
- adapterDescription.getTransformationConfig()
- .getLanguage(),
- adapterDescription.getTransformationConfig()
- .getScript()
- ));
- }
-
List<TransformationRule> transformationRules = new ArrayList<>();
if (includeStateful) {
transformationRules.addAll(getReduceEventTransformationRule(adapterDescription));
diff --git
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
index a00806aed0..3024d38019 100644
---
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
+++
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
@@ -22,11 +22,12 @@ import
org.apache.streampipes.connect.transformer.api.ScriptTransformer;
import org.apache.streampipes.connect.transformer.api.TransformationEngines;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-public class ScriptTransformationPipelineElement implements
IAdapterPipelineElement {
+public class ScriptTransformationPipelineElement {
ScriptTransformer scriptTransformer;
public ScriptTransformationPipelineElement(String language, String
transformationScript) {
@@ -38,11 +39,11 @@ public class ScriptTransformationPipelineElement implements
IAdapterPipelineElem
}
}
-
- @Override
- public Map<String, Object> process(Map<String, Object> event) {
+ public List<Map<String, Object>> process(Map<String, Object> event) {
try {
- return scriptTransformer.transform(event);
+ List<Map<String, Object>> out = new ArrayList<>();
+ scriptTransformer.transform(event, out::add, null);
+ return out;
} catch (ScriptExecutionException e) {
throw new RuntimeException(e);
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
similarity index 56%
copy from
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
index d06885fb5d..4ab88aafa8 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
@@ -18,23 +18,5 @@
package org.apache.streampipes.connect.transformer.api;
-import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-
-import java.util.Map;
-
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
-
- /**
- * Apply the compiled template to the incoming data.
- *
- * @param input input event map keyed by runtime name
- * @return output event map keyed by runtime name
- * @throws ScriptExecutionException
- * utionException when execution fails or returns an invalid result
- */
- Map<String, Object> transform(Map<String, Object> input) throws
ScriptExecutionException;
+public interface Context {
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
similarity index 62%
copy from
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
index d06885fb5d..34a5b7f8e1 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
@@ -20,21 +20,10 @@ package org.apache.streampipes.connect.transformer.api;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-import java.util.Map;
+@FunctionalInterface
+public interface OutputCollector<T> {
+
+ void collect(T event) throws ScriptExecutionException;
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
- /**
- * Apply the compiled template to the incoming data.
- *
- * @param input input event map keyed by runtime name
- * @return output event map keyed by runtime name
- * @throws ScriptExecutionException
- * utionException when execution fails or returns an invalid result
- */
- Map<String, Object> transform(Map<String, Object> input) throws
ScriptExecutionException;
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
index d06885fb5d..97d7f3ad27 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
@@ -32,9 +32,11 @@ public interface ScriptTransformer {
* Apply the compiled template to the incoming data.
*
* @param input input event map keyed by runtime name
- * @return output event map keyed by runtime name
- * @throws ScriptExecutionException
- * utionException when execution fails or returns an invalid result
+ * @param out output event
+ * @param ctx reserved for later, currently null
+ * @throws ScriptExecutionException Exception when execution fails or
returns an invalid result
*/
- Map<String, Object> transform(Map<String, Object> input) throws
ScriptExecutionException;
+ void transform(Map<String, Object> input,
+ OutputCollector<Map<String, Object>> out,
+ Context ctx) throws ScriptExecutionException;
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
index 066bfcb98a..dbbfd99477 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.transformer.api.utils;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
import java.util.HashMap;
@@ -44,4 +45,15 @@ public class TransformationEngineConversionUtils {
rawMap.forEach((k, v) -> result.put(String.valueOf(k), v));
return result;
}
+
+ public static OutputCollector<Object>
convertingCollector(OutputCollector<Map<String, Object>> delegate,
+ String language) {
+ return eventObj -> {
+ Map<String, Object> map = ensureMap(
+ eventObj,
+ language
+ );
+ delegate.collect(map);
+ };
+ }
}
diff --git
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
index a217b8b3aa..6b20789f7c 100644
---
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
+++
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.connect.transformer.groovy;
+import org.apache.streampipes.connect.transformer.api.Context;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
import org.apache.streampipes.connect.transformer.api.TransformationEngine;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -40,7 +42,7 @@ public class GroovyScriptEngine implements
TransformationEngine {
return new ScriptMetadata(
"groovy",
"Groovy",
- ""
+ "out.collect(input)"
);
}
@@ -56,17 +58,21 @@ public class GroovyScriptEngine implements
TransformationEngine {
Class<? extends Script> scriptClass = script.getClass();
- return input -> execute(scriptClass, input);
+ return (input, out, ctx) -> execute(scriptClass, input, out, ctx);
}
- private Map<String, Object> execute(Class<? extends Script> scriptClass,
Map<String, Object> input)
+ private void execute(Class<? extends Script> scriptClass,
+ Map<String, Object> input,
+ OutputCollector<Map<String, Object>> out,
+ Context ctx)
throws ScriptExecutionException {
try {
Binding binding = new Binding();
binding.setVariable("input", input);
+ binding.setVariable("out",
TransformationEngineConversionUtils.convertingCollector(out,
metadata().language()));
+ binding.setVariable("ctx", ctx);
Script scriptInstance = InvokerHelper.createScript(scriptClass, binding);
- Object result = scriptInstance.run();
- return TransformationEngineConversionUtils.ensureMap(result,
metadata().language());
+ scriptInstance.run();
} catch (Exception e) {
throw new ScriptExecutionException("Groovy template execution failed",
e);
}
diff --git
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
index 6cb9b267a7..2b0455d692 100644
---
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
+++
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.transformer.js;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
import org.apache.streampipes.connect.transformer.api.TransformationEngine;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -41,8 +42,8 @@ public class GraalJsScriptEngine implements
TransformationEngine {
"JavaScript",
"""
// returns the same event
- function transform(event) {
- return event;
+ function transform(event, out, ctx) {
+ out.collect(event);
}
"""
);
@@ -61,7 +62,7 @@ public class GraalJsScriptEngine implements
TransformationEngine {
Value transformFunction = resolveFunction(context, compiled);
- return input -> execute(transformFunction, input);
+ return (input, out, ctx) -> execute(transformFunction, input, out, ctx);
}
private Context createContext() {
@@ -89,11 +90,14 @@ public class GraalJsScriptEngine implements
TransformationEngine {
null);
}
- private Map<String, Object> execute(Value transformFunction, Map<String,
Object> input)
+ private void execute(Value transformFunction,
+ Map<String, Object> input,
+ OutputCollector<Map<String, Object>> out,
+ org.apache.streampipes.connect.transformer.api.Context
ctx)
throws ScriptExecutionException {
try {
- Value result = transformFunction.execute(input);
- return PolyglotResultConverter.ensureMap(result, metadata().language());
+ transformFunction.execute(input,
PolyglotResultConverter.convertingCollector(out, metadata().language()), ctx);
+ //return PolyglotResultConverter.ensureMap(result,
metadata().language());
} catch (PolyglotException e) {
throw new ScriptExecutionException("Graal JS script execution failed",
e);
}
diff --git
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
index b280ae247d..9f85ef3c41 100644
---
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
+++
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.transformer.js;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
import
org.apache.streampipes.connect.transformer.api.utils.TransformationEngineConversionUtils;
@@ -69,4 +70,19 @@ public class PolyglotResultConverter {
throw new ScriptExecutionException(
"Template in " + language + " returned a non-object value: " +
value.toString());
}
+
+ public static OutputCollector<Object>
convertingCollector(OutputCollector<Map<String, Object>> delegate, String
language) {
+ return obj -> {
+ Map<String,Object> map;
+ if (obj instanceof Map<?, ?> m) {
+ //noinspection unchecked
+ map = (Map<String,Object>) m;
+ } else if (obj instanceof Value v) {
+ map = PolyglotResultConverter.ensureMap(v, language);
+ } else {
+ throw new IllegalArgumentException("Collected event must be an
object/map");
+ }
+ delegate.collect(map);
+ };
+ }
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index 6603bcb7b9..0309814a69 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -26,18 +26,16 @@ import
org.apache.streampipes.model.connect.adapter.AdapterDescription;
public class AdapterPipelineGenerator extends AdapterPipelineGeneratorBase {
public AdapterPipeline generatePipeline(AdapterDescription
adapterDescription) {
-
- var includeScript =
adapterDescription.getTransformationConfig().isScriptActive();
-
- var pipelineElements = makeAdapterPipelineElements(true,
adapterDescription, includeScript);
+ var pipelineElements = makeAdapterPipelineElements(true,
adapterDescription);
if (hasValidGrounding(adapterDescription)) {
return new AdapterPipeline(
pipelineElements,
+ adapterDescription.getTransformationConfig(),
getAdapterSink(adapterDescription),
adapterDescription.getEventSchema());
} else {
- return new AdapterPipeline(pipelineElements,
adapterDescription.getEventSchema());
+ return new AdapterPipeline(pipelineElements,
adapterDescription.getTransformationConfig(),
adapterDescription.getEventSchema());
}
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index b703b94b3c..43ee703c26 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -18,12 +18,15 @@
package
org.apache.streampipes.extensions.management.connect.adapter.model.pipeline;
+import
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import org.apache.streampipes.model.connect.TransformationConfig;
import org.apache.streampipes.model.schema.EventSchema;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
public class AdapterPipeline implements IAdapterPipeline {
@@ -31,31 +34,43 @@ public class AdapterPipeline implements IAdapterPipeline {
private IAdapterPipelineElement pipelineSink;
private final EventSchema resultingEventSchema;
+ private final Function<Map<String, Object>, List<Map<String, Object>>>
processingFn;
public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+ TransformationConfig transformationConfig,
EventSchema resultingEventSchema) {
this.pipelineElements = pipelineElements;
this.resultingEventSchema = resultingEventSchema;
+ if (transformationConfig.isScriptActive()) {
+ var transformation = new ScriptTransformationPipelineElement(
+ transformationConfig.getLanguage(),
+ transformationConfig.getScript()
+ );
+ processingFn = transformation::process;
+ } else {
+ processingFn = List::of;
+ }
}
public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+ TransformationConfig transformationConfig,
IAdapterPipelineElement pipelineSink,
EventSchema resultingEventSchema) {
- this.pipelineElements = pipelineElements;
+ this(pipelineElements, transformationConfig, resultingEventSchema);
this.pipelineSink = pipelineSink;
- this.resultingEventSchema = resultingEventSchema;
}
@Override
public void process(Map<String, Object> event) {
-
- for (IAdapterPipelineElement pipelineElement : pipelineElements) {
- event = pipelineElement.process(event);
- }
- if (pipelineSink != null) {
- pipelineSink.process(event);
- }
-
+ var scriptResult = this.processingFn.apply(event);
+ scriptResult.forEach(result -> {
+ for (IAdapterPipelineElement pipelineElement : pipelineElements) {
+ result = pipelineElement.process(result);
+ }
+ if (pipelineSink != null) {
+ pipelineSink.process(result);
+ }
+ });
}
@Override
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
index f068ef44c6..da7369f3f9 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
@@ -21,22 +21,22 @@ package
org.apache.streampipes.service.core.migrations.v099.connect;
public class TransformationScriptBuilder {
private StringBuilder sb;
- private boolean scriptAcive;
+ private boolean scriptActive;
private TransformationScriptBuilder() {
- scriptAcive = false;
+ scriptActive = false;
}
public static TransformationScriptBuilder create() {
TransformationScriptBuilder builder = new TransformationScriptBuilder();
builder.sb = new StringBuilder();
- builder.sb.append("function transform(event) {\n");
+ builder.sb.append("function transform(event, out, ctx) {\n");
return builder;
}
public TransformationScriptBuilder appendLine(String line) {
if (!line.startsWith("//")) {
- scriptAcive = true;
+ scriptActive = true;
}
sb.append(" ").append(line).append("\n");
@@ -45,11 +45,11 @@ public class TransformationScriptBuilder {
// Used to check if any script lines were added
public boolean isScriptActive() {
- return scriptAcive;
+ return scriptActive;
}
public String build() {
- sb.append(" return event;\n");
+ sb.append(" out.collect(event);\n");
sb.append("}");
return sb.toString();
}