This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 687c5022c3 feat: Modify script signature (#4098)
687c5022c3 is described below
commit 687c5022c3bbda0c22b7985f84a10153375c86bc
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 15 20:36:02 2026 +0100
feat: Modify script signature (#4098)
Co-authored-by: Philipp Zehnder <[email protected]>
---
.../go-client-e2e/adapter/machine.json | 2 +-
.../management/AdapterEventPreviewPipeline.java | 2 +-
.../compact/generator/AdapterSchemaGenerator.java | 7 +++--
.../management/management/GuessManagement.java | 8 ++---
.../shared/AdapterPipelineGeneratorBase.java | 13 +-------
.../ScriptTransformationPipelineElement.java | 13 ++++----
.../api/{ScriptTransformer.java => Context.java} | 20 +------------
...ScriptTransformer.java => OutputCollector.java} | 19 +++---------
.../connect/transformer/api/ScriptTransformer.java | 10 ++++---
.../utils/TransformationEngineConversionUtils.java | 12 ++++++++
.../transformer/groovy/GroovyScriptEngine.java | 16 ++++++----
.../transformer/js/GraalJsScriptEngine.java | 15 ++++++----
.../transformer/js/PolyglotResultConverter.java | 16 ++++++++++
.../connect/adapter/AdapterPipelineGenerator.java | 8 ++---
.../adapter/model/pipeline/AdapterPipeline.java | 35 +++++++++++++++-------
.../v099/connect/TransformationScriptBuilder.java | 12 ++++----
.../v099/MigrateAdaptersToUseScriptTest.java | 4 +--
.../support/utils/connect/CompactAdapterUtils.ts | 4 ++-
.../utils/connect/ConnectEventSchemaUtils.ts | 4 +--
ui/cypress/support/utils/connect/ConnectUtils.ts | 8 ++++-
.../compactAdapterWithTransformation.spec.ts | 4 +--
ui/cypress/tests/connect/editAdapter.smoke.spec.ts | 4 +--
ui/cypress/tests/connect/fileStream.spec.ts | 2 +-
.../connect/rules/deleteTransformationRule.spec.ts | 2 +-
.../tests/connect/rules/schemaRules.smoke.spec.ts | 2 +-
ui/cypress/tests/connect/rules/valueRules.spec.ts | 2 +-
ui/cypress/tests/connect/scriptTemplate.spec.ts | 2 +-
27 files changed, 134 insertions(+), 112 deletions(-)
diff --git a/streampipes-client-e2e/go-client-e2e/adapter/machine.json
b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
index d3272ef73a..57f9645dd8 100644
--- a/streampipes-client-e2e/go-client-e2e/adapter/machine.json
+++ b/streampipes-client-e2e/go-client-e2e/adapter/machine.json
@@ -220,7 +220,7 @@
"correspondingDataStreamElementId" : "sp:spdatastream:wMyUZS",
"schemaTransformationConfig" : {
"language" : "javascript",
- "script" : "// returns the same event\nfunction transform(event) {\n
return event;\n}",
+ "script" : "function transform(event, out, ctx) {\n
out.collect(event);\n}\n",
"inputs" : [ {
"mass_flow" : 0.0620350632211194,
"density" : 46.62523604047736,
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
index 5fe9375c7c..6a1e16a3c6 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/AdapterEventPreviewPipeline.java
@@ -37,7 +37,7 @@ public class AdapterEventPreviewPipeline implements
IAdapterPipeline {
this.pipelineElements = new AdapterPipelineGeneratorBase()
- .makeAdapterPipelineElements(false, adapterDescription, false);
+ .makeAdapterPipelineElements(false, adapterDescription);
this.event =
adapterDescription.getTransformationConfig().getOutputs().get(0);
}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
index 4333680db4..1b0d1bc822 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/compact/generator/AdapterSchemaGenerator.java
@@ -91,11 +91,12 @@ public class AdapterSchemaGenerator implements
AdapterModelGenerator {
|| adapterDescription.getTransformationConfig()
.getScript()
.isEmpty()) {
+ adapterDescription.getTransformationConfig().setScriptActive(true);
adapterDescription.getTransformationConfig()
.setScript("""
- function transform(event) {
- return event;
- }
+ function transform(event, out, ctx) {
+ out.collect(event);
+ }
""");
}
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
index fe5fa56b33..62adce519e 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/GuessManagement.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -108,8 +109,6 @@ public class GuessManagement {
} else {
try {
-
-
var transformationScript =
adapterDescription.getTransformationConfig();
var engine =
TransformationEngines.INSTANCE.getTransformationEngine(transformationScript.getLanguage());
var compiledScript = engine.compile(transformationScript.getScript());
@@ -117,10 +116,11 @@ public class GuessManagement {
var samples = adapterDescription.getTransformationConfig()
.getInputs();
if (!samples.isEmpty()) {
- var result = compiledScript.transform(samples.get(0));
+ List<Map<String, Object>> results = new ArrayList<>();
+ compiledScript.transform(samples.get(0), results::add, null);
adapterDescription.getTransformationConfig()
- .setOutputs(List.of(result));
+ .setOutputs(results);
} else {
throw new AdapterException("No samples available to transform");
}
diff --git
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
index e0c5d710dd..9c89c7ce8b 100644
---
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
+++
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/AdapterPipelineGeneratorBase.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.connect.shared;
import
org.apache.streampipes.connect.shared.preprocessing.elements.AdapterTransformationPipelineElement;
-import
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
import
org.apache.streampipes.connect.shared.preprocessing.transform.stream.EventRateTransformationRule;
import
org.apache.streampipes.connect.shared.preprocessing.transform.stream.RemoveDuplicatesTransformationRule;
import
org.apache.streampipes.connect.shared.preprocessing.transform.value.DatatypeTransformationRule;
@@ -37,20 +36,10 @@ public class AdapterPipelineGeneratorBase {
public List<IAdapterPipelineElement> makeAdapterPipelineElements(
boolean includeStateful,
- AdapterDescription adapterDescription,
- boolean includeScript
+ AdapterDescription adapterDescription
) {
var elements = new ArrayList<IAdapterPipelineElement>();
- if (includeScript) {
- elements.add(new ScriptTransformationPipelineElement(
- adapterDescription.getTransformationConfig()
- .getLanguage(),
- adapterDescription.getTransformationConfig()
- .getScript()
- ));
- }
-
List<TransformationRule> transformationRules = new ArrayList<>();
if (includeStateful) {
transformationRules.addAll(getReduceEventTransformationRule(adapterDescription));
diff --git
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
index a00806aed0..3024d38019 100644
---
a/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
+++
b/streampipes-connect-shared/src/main/java/org/apache/streampipes/connect/shared/preprocessing/elements/ScriptTransformationPipelineElement.java
@@ -22,11 +22,12 @@ import
org.apache.streampipes.connect.transformer.api.ScriptTransformer;
import org.apache.streampipes.connect.transformer.api.TransformationEngines;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-public class ScriptTransformationPipelineElement implements
IAdapterPipelineElement {
+public class ScriptTransformationPipelineElement {
ScriptTransformer scriptTransformer;
public ScriptTransformationPipelineElement(String language, String
transformationScript) {
@@ -38,11 +39,11 @@ public class ScriptTransformationPipelineElement implements
IAdapterPipelineElem
}
}
-
- @Override
- public Map<String, Object> process(Map<String, Object> event) {
+ public List<Map<String, Object>> process(Map<String, Object> event) {
try {
- return scriptTransformer.transform(event);
+ List<Map<String, Object>> out = new ArrayList<>();
+ scriptTransformer.transform(event, out::add, null);
+ return out;
} catch (ScriptExecutionException e) {
throw new RuntimeException(e);
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
similarity index 56%
copy from
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
index d06885fb5d..4ab88aafa8 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/Context.java
@@ -18,23 +18,5 @@
package org.apache.streampipes.connect.transformer.api;
-import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-
-import java.util.Map;
-
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
-
- /**
- * Apply the compiled template to the incoming data.
- *
- * @param input input event map keyed by runtime name
- * @return output event map keyed by runtime name
- * @throws ScriptExecutionException
- * utionException when execution fails or returns an invalid result
- */
- Map<String, Object> transform(Map<String, Object> input) throws
ScriptExecutionException;
+public interface Context {
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
similarity index 62%
copy from
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
copy to
streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
index d06885fb5d..34a5b7f8e1 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/OutputCollector.java
@@ -20,21 +20,10 @@ package org.apache.streampipes.connect.transformer.api;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
-import java.util.Map;
+@FunctionalInterface
+public interface OutputCollector<T> {
+
+ void collect(T event) throws ScriptExecutionException;
-/**
- * Transforms an input event map into an output event map.
- * Implementations are language-specific but must adhere to this contract.
- */
-public interface ScriptTransformer {
- /**
- * Apply the compiled template to the incoming data.
- *
- * @param input input event map keyed by runtime name
- * @return output event map keyed by runtime name
- * @throws ScriptExecutionException
- * utionException when execution fails or returns an invalid result
- */
- Map<String, Object> transform(Map<String, Object> input) throws
ScriptExecutionException;
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
index d06885fb5d..97d7f3ad27 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/ScriptTransformer.java
@@ -32,9 +32,11 @@ public interface ScriptTransformer {
* Apply the compiled template to the incoming data.
*
* @param input input event map keyed by runtime name
- * @return output event map keyed by runtime name
- * @throws ScriptExecutionException
- * utionException when execution fails or returns an invalid result
+ * @param out output event
+ * @param ctx reserved for later, currently null
+ * @throws ScriptExecutionException Exception when execution fails or
returns an invalid result
*/
- Map<String, Object> transform(Map<String, Object> input) throws
ScriptExecutionException;
+ void transform(Map<String, Object> input,
+ OutputCollector<Map<String, Object>> out,
+ Context ctx) throws ScriptExecutionException;
}
diff --git
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
index 066bfcb98a..dbbfd99477 100644
---
a/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
+++
b/streampipes-connect-transformer-api/src/main/java/org/apache/streampipes/connect/transformer/api/utils/TransformationEngineConversionUtils.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.transformer.api.utils;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
import java.util.HashMap;
@@ -44,4 +45,15 @@ public class TransformationEngineConversionUtils {
rawMap.forEach((k, v) -> result.put(String.valueOf(k), v));
return result;
}
+
+ public static OutputCollector<Object>
convertingCollector(OutputCollector<Map<String, Object>> delegate,
+ String language) {
+ return eventObj -> {
+ Map<String, Object> map = ensureMap(
+ eventObj,
+ language
+ );
+ delegate.collect(map);
+ };
+ }
}
diff --git
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
index a217b8b3aa..6b20789f7c 100644
---
a/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
+++
b/streampipes-connect-transformer-groovy/src/main/java/org/apache/streampipes/connect/transformer/groovy/GroovyScriptEngine.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.connect.transformer.groovy;
+import org.apache.streampipes.connect.transformer.api.Context;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
import org.apache.streampipes.connect.transformer.api.TransformationEngine;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -40,7 +42,7 @@ public class GroovyScriptEngine implements
TransformationEngine {
return new ScriptMetadata(
"groovy",
"Groovy",
- ""
+ "out.collect(input)"
);
}
@@ -56,17 +58,21 @@ public class GroovyScriptEngine implements
TransformationEngine {
Class<? extends Script> scriptClass = script.getClass();
- return input -> execute(scriptClass, input);
+ return (input, out, ctx) -> execute(scriptClass, input, out, ctx);
}
- private Map<String, Object> execute(Class<? extends Script> scriptClass,
Map<String, Object> input)
+ private void execute(Class<? extends Script> scriptClass,
+ Map<String, Object> input,
+ OutputCollector<Map<String, Object>> out,
+ Context ctx)
throws ScriptExecutionException {
try {
Binding binding = new Binding();
binding.setVariable("input", input);
+ binding.setVariable("out",
TransformationEngineConversionUtils.convertingCollector(out,
metadata().language()));
+ binding.setVariable("ctx", ctx);
Script scriptInstance = InvokerHelper.createScript(scriptClass, binding);
- Object result = scriptInstance.run();
- return TransformationEngineConversionUtils.ensureMap(result,
metadata().language());
+ scriptInstance.run();
} catch (Exception e) {
throw new ScriptExecutionException("Groovy template execution failed",
e);
}
diff --git
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
index 6cb9b267a7..a645e314d0 100644
---
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
+++
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/GraalJsScriptEngine.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.transformer.js;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import org.apache.streampipes.connect.transformer.api.ScriptTransformer;
import org.apache.streampipes.connect.transformer.api.TransformationEngine;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptCompilationException;
@@ -41,8 +42,8 @@ public class GraalJsScriptEngine implements
TransformationEngine {
"JavaScript",
"""
// returns the same event
- function transform(event) {
- return event;
+ function transform(event, out, ctx) {
+ out.collect(event);
}
"""
);
@@ -61,7 +62,7 @@ public class GraalJsScriptEngine implements
TransformationEngine {
Value transformFunction = resolveFunction(context, compiled);
- return input -> execute(transformFunction, input);
+ return (input, out, ctx) -> execute(transformFunction, input, out, ctx);
}
private Context createContext() {
@@ -89,11 +90,13 @@ public class GraalJsScriptEngine implements
TransformationEngine {
null);
}
- private Map<String, Object> execute(Value transformFunction, Map<String,
Object> input)
+ private void execute(Value transformFunction,
+ Map<String, Object> input,
+ OutputCollector<Map<String, Object>> out,
+ org.apache.streampipes.connect.transformer.api.Context
ctx)
throws ScriptExecutionException {
try {
- Value result = transformFunction.execute(input);
- return PolyglotResultConverter.ensureMap(result, metadata().language());
+ transformFunction.execute(input,
PolyglotResultConverter.convertingCollector(out, metadata().language()), ctx);
} catch (PolyglotException e) {
throw new ScriptExecutionException("Graal JS script execution failed",
e);
}
diff --git
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
index b280ae247d..9f85ef3c41 100644
---
a/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
+++
b/streampipes-connect-transformer-js/src/main/java/org/apache/streampipes/connect/transformer/js/PolyglotResultConverter.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.transformer.js;
+import org.apache.streampipes.connect.transformer.api.OutputCollector;
import
org.apache.streampipes.connect.transformer.api.exception.ScriptExecutionException;
import
org.apache.streampipes.connect.transformer.api.utils.TransformationEngineConversionUtils;
@@ -69,4 +70,19 @@ public class PolyglotResultConverter {
throw new ScriptExecutionException(
"Template in " + language + " returned a non-object value: " +
value.toString());
}
+
+ public static OutputCollector<Object>
convertingCollector(OutputCollector<Map<String, Object>> delegate, String
language) {
+ return obj -> {
+ Map<String,Object> map;
+ if (obj instanceof Map<?, ?> m) {
+ //noinspection unchecked
+ map = (Map<String,Object>) m;
+ } else if (obj instanceof Value v) {
+ map = PolyglotResultConverter.ensureMap(v, language);
+ } else {
+ throw new IllegalArgumentException("Collected event must be an
object/map");
+ }
+ delegate.collect(map);
+ };
+ }
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
index 6603bcb7b9..0309814a69 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/AdapterPipelineGenerator.java
@@ -26,18 +26,16 @@ import
org.apache.streampipes.model.connect.adapter.AdapterDescription;
public class AdapterPipelineGenerator extends AdapterPipelineGeneratorBase {
public AdapterPipeline generatePipeline(AdapterDescription
adapterDescription) {
-
- var includeScript =
adapterDescription.getTransformationConfig().isScriptActive();
-
- var pipelineElements = makeAdapterPipelineElements(true,
adapterDescription, includeScript);
+ var pipelineElements = makeAdapterPipelineElements(true,
adapterDescription);
if (hasValidGrounding(adapterDescription)) {
return new AdapterPipeline(
pipelineElements,
+ adapterDescription.getTransformationConfig(),
getAdapterSink(adapterDescription),
adapterDescription.getEventSchema());
} else {
- return new AdapterPipeline(pipelineElements,
adapterDescription.getEventSchema());
+ return new AdapterPipeline(pipelineElements,
adapterDescription.getTransformationConfig(),
adapterDescription.getEventSchema());
}
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
index b703b94b3c..43ee703c26 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -18,12 +18,15 @@
package
org.apache.streampipes.extensions.management.connect.adapter.model.pipeline;
+import
org.apache.streampipes.connect.shared.preprocessing.elements.ScriptTransformationPipelineElement;
import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
+import org.apache.streampipes.model.connect.TransformationConfig;
import org.apache.streampipes.model.schema.EventSchema;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
public class AdapterPipeline implements IAdapterPipeline {
@@ -31,31 +34,43 @@ public class AdapterPipeline implements IAdapterPipeline {
private IAdapterPipelineElement pipelineSink;
private final EventSchema resultingEventSchema;
+ private final Function<Map<String, Object>, List<Map<String, Object>>>
processingFn;
public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+ TransformationConfig transformationConfig,
EventSchema resultingEventSchema) {
this.pipelineElements = pipelineElements;
this.resultingEventSchema = resultingEventSchema;
+ if (transformationConfig.isScriptActive()) {
+ var transformation = new ScriptTransformationPipelineElement(
+ transformationConfig.getLanguage(),
+ transformationConfig.getScript()
+ );
+ processingFn = transformation::process;
+ } else {
+ processingFn = List::of;
+ }
}
public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements,
+ TransformationConfig transformationConfig,
IAdapterPipelineElement pipelineSink,
EventSchema resultingEventSchema) {
- this.pipelineElements = pipelineElements;
+ this(pipelineElements, transformationConfig, resultingEventSchema);
this.pipelineSink = pipelineSink;
- this.resultingEventSchema = resultingEventSchema;
}
@Override
public void process(Map<String, Object> event) {
-
- for (IAdapterPipelineElement pipelineElement : pipelineElements) {
- event = pipelineElement.process(event);
- }
- if (pipelineSink != null) {
- pipelineSink.process(event);
- }
-
+ var scriptResult = this.processingFn.apply(event);
+ scriptResult.forEach(result -> {
+ for (IAdapterPipelineElement pipelineElement : pipelineElements) {
+ result = pipelineElement.process(result);
+ }
+ if (pipelineSink != null) {
+ pipelineSink.process(result);
+ }
+ });
}
@Override
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
index f068ef44c6..da7369f3f9 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/connect/TransformationScriptBuilder.java
@@ -21,22 +21,22 @@ package
org.apache.streampipes.service.core.migrations.v099.connect;
public class TransformationScriptBuilder {
private StringBuilder sb;
- private boolean scriptAcive;
+ private boolean scriptActive;
private TransformationScriptBuilder() {
- scriptAcive = false;
+ scriptActive = false;
}
public static TransformationScriptBuilder create() {
TransformationScriptBuilder builder = new TransformationScriptBuilder();
builder.sb = new StringBuilder();
- builder.sb.append("function transform(event) {\n");
+ builder.sb.append("function transform(event, out, ctx) {\n");
return builder;
}
public TransformationScriptBuilder appendLine(String line) {
if (!line.startsWith("//")) {
- scriptAcive = true;
+ scriptActive = true;
}
sb.append(" ").append(line).append("\n");
@@ -45,11 +45,11 @@ public class TransformationScriptBuilder {
// Used to check if any script lines were added
public boolean isScriptActive() {
- return scriptAcive;
+ return scriptActive;
}
public String build() {
- sb.append(" return event;\n");
+ sb.append(" out.collect(event);\n");
sb.append("}");
return sb.toString();
}
diff --git
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
index 4c963e529e..b12978ec67 100644
---
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
+++
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/migrations/v099/MigrateAdaptersToUseScriptTest.java
@@ -276,7 +276,7 @@ class MigrateAdaptersToUseScriptTest {
"Time window should match legacy config");
// Ensure the script still contains the standard boilerplate even if this
rule is stateful
- assertTrue(resultConfig.getScript().contains("function transform(event)"),
+ assertTrue(resultConfig.getScript().contains("function transform(event,
out, ctx)"),
"Script should still be generated as a container");
assertFalse(adapter.getTransformationConfig().isScriptActive());
}
@@ -395,4 +395,4 @@ class MigrateAdaptersToUseScriptTest {
adapter.getDataStream().setEventSchema(new EventSchema());
return adapter;
}
-}
\ No newline at end of file
+}
diff --git a/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
b/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
index 53af565d31..6a8c09db70 100644
--- a/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
+++ b/ui/cypress/support/utils/connect/CompactAdapterUtils.ts
@@ -76,7 +76,9 @@ export class CompactAdapterUtils {
)
.setName('Test')
.withScript(
- 'function transform(event) {\n' + ' return event;\n' + '}',
+ 'function transform(event, out, ctx) {\n' +
+ ' out.collect(event);\n' +
+ '}\n',
)
.addConfiguration('wait-time-ms', '1000')
.addConfiguration('selected-simulator-option', 'flowrate');
diff --git a/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
b/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
index 816a069216..1f0a44ddb0 100644
--- a/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectEventSchemaUtils.ts
@@ -54,9 +54,9 @@ export class ConnectEventSchemaUtils {
private static addTimestampFieldToScript() {
ConnectBtns.configureSchemaScriptEditor()
- .type('{backspace}'.repeat(17)) // 2. Delete the " return
event;\n}" part
+ .type('{backspace}'.repeat(22)) // 2. Delete the "
out.collect(event);\n}" part
.type(
- ' event.timestamp = new Date().getTime();{enter}return
event;{enter}}',
+ ' event.timestamp = new Date().getTime();{enter}return
out.collect(event);{enter}}',
);
}
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts
b/ui/cypress/support/utils/connect/ConnectUtils.ts
index 3666fb8a1b..97f0e0635e 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -398,8 +398,14 @@ export class ConnectUtils {
}
public static replaceAdapterScript(script: string) {
+ // Ensure that the script is loaded
+ ConnectBtns.configureSchemaScriptEditor().should(
+ 'contain.text',
+ 'out.collect(event);',
+ );
+
ConnectBtns.configureSchemaScriptEditor()
- .type('{backspace}'.repeat(17)) // 2. Delete the " return
event;\n}" part
+ .type('{backspace}'.repeat(22)) // 2. Delete the "
out.collect(event);\n}" part
.type(script);
}
diff --git
a/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
b/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
index b5f2cb7996..7e25b40112 100644
--- a/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
+++ b/ui/cypress/tests/connect/compact/compactAdapterWithTransformation.spec.ts
@@ -30,10 +30,10 @@ describe('Add Compact Adapters', () => {
const newPropertyName = 'temperature_renamed';
const compactAdapter = CompactAdapterUtils.getMachineDataSimulator()
.withScript(
- 'function transform(event) {\n' +
+ 'function transform(event, out, ctx) {\n' +
' event.temperature_renamed = event.temperature \n' +
' delete event.temperature \n' +
- ' return event;\n' +
+ ' out.collect(event);\n' +
'}',
)
.setStart()
diff --git a/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
b/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
index 7c0b91f423..0edec29055 100644
--- a/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
+++ b/ui/cypress/tests/connect/editAdapter.smoke.spec.ts
@@ -95,11 +95,11 @@ describe('Test Edit Adapter', () => {
ConnectUtils.replaceAdapterScript(
' event.density = event.density * 2;\n' +
- ' return event;\n' +
+ ' out.collect(event);\n' +
'}',
);
ConnectBtns.configureSchemaRunScriptBtn().click();
- cy.wait(500);
+ cy.wait(1000);
ConnectBtns.configureSchemaNextBtn().click();
diff --git a/ui/cypress/tests/connect/fileStream.spec.ts
b/ui/cypress/tests/connect/fileStream.spec.ts
index a11dbf6fca..0dfbe6206d 100644
--- a/ui/cypress/tests/connect/fileStream.spec.ts
+++ b/ui/cypress/tests/connect/fileStream.spec.ts
@@ -51,7 +51,7 @@ describe('Test File Replay Adapter', () => {
ConnectUtils.setUpPreprocessingRuleTest(false);
ConnectUtils.replaceAdapterScript(
- 'event.timestamp = event.timestamp * 1000;\n return event;\n}',
+ 'event.timestamp = event.timestamp * 1000;\n
out.collect(event);\n}',
);
ConnectBtns.configureSchemaRunScriptBtn().click();
diff --git a/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
b/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
index 72b6329324..7ab7527a75 100644
--- a/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
+++ b/ui/cypress/tests/connect/rules/deleteTransformationRule.spec.ts
@@ -50,7 +50,7 @@ describe('Connect delete rule transformation', () => {
ConnectUtils.replaceAdapterScript(
' delete event.toRemove;\n' +
' delete event.parent.child_two;\n' +
- ' return event;\n' +
+ ' out.collect(event);\n' +
'}',
);
diff --git a/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
b/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
index 507292b1fa..5d4925ffaf 100644
--- a/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
+++ b/ui/cypress/tests/connect/rules/schemaRules.smoke.spec.ts
@@ -35,7 +35,7 @@ describe('Connect schema rule transformations', () => {
ConnectUtils.replaceAdapterScript(
" event['dot'] = event ['contains.dot'];\n" +
" delete event['contains.dot'];\n" +
- ' return event;\n' +
+ ' out.collect(event);\n' +
'}',
);
diff --git a/ui/cypress/tests/connect/rules/valueRules.spec.ts
b/ui/cypress/tests/connect/rules/valueRules.spec.ts
index a03fa8cf1c..48b08e45b0 100644
--- a/ui/cypress/tests/connect/rules/valueRules.spec.ts
+++ b/ui/cypress/tests/connect/rules/valueRules.spec.ts
@@ -32,7 +32,7 @@ describe('Connect value rule transformations', () => {
ConnectUtils.setUpPreprocessingRuleTest(true);
ConnectUtils.replaceAdapterScript(
- 'event.timestamp = new Date(event.timestamp).getTime();\n return
event;\n}',
+ 'event.timestamp = new Date(event.timestamp).getTime();\n
out.collect(event);\n}',
);
ConnectBtns.configureSchemaRunScriptBtn().click();
cy.wait(1000);
diff --git a/ui/cypress/tests/connect/scriptTemplate.spec.ts
b/ui/cypress/tests/connect/scriptTemplate.spec.ts
index fc04073f75..313917d2b4 100644
--- a/ui/cypress/tests/connect/scriptTemplate.spec.ts
+++ b/ui/cypress/tests/connect/scriptTemplate.spec.ts
@@ -22,7 +22,7 @@ import { ConnectBtns } from
'../../support/utils/connect/ConnectBtns';
const TEMPLATE_NAME = 'TestTemplate';
const SCRIPT_LINE = "event.b = 'b';";
const SCRIPT = ` ${SCRIPT_LINE}
-return event;
+out.collect(event);
}`;
describe('Validate Warning Pops For Configuration Changes ', () => {