This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3646-update-processors-in-processors-enrichmer-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit d704d659addc34bb1fdee126485b0004f8b3e613
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jun 6 21:58:04 2025 +0200

    refactor: Update processors to implement IStreamPipesDataProcessor and 
refactor method signatures
---
 .../jvm/processor/jseval/JSEvalProcessor.java      | 55 +++++++------
 .../QualityControlLimitsEnrichmentProcessor.java   | 91 ++++++++++----------
 .../jvm/processor/math/MathOpProcessor.java        | 96 +++++++++++-----------
 .../math/staticmathop/StaticMathOpProcessor.java   | 61 +++++++-------
 .../sizemeasure/SizeMeasureProcessor.java          | 75 +++++++++--------
 .../trigonometry/TrigonometryProcessor.java        | 62 +++++++-------
 .../valuechange/ValueChangeProcessor.java          | 58 ++++++-------
 7 files changed, 258 insertions(+), 240 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java
index 78ee849115..a43d831026 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java
@@ -18,21 +18,22 @@
 package org.apache.streampipes.processors.enricher.jvm.processor.jseval;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.CodeLanguage;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import org.graalvm.polyglot.Context;
 import org.graalvm.polyglot.Value;
@@ -41,7 +42,7 @@ import org.graalvm.polyglot.proxy.ProxyObject;
 import java.util.HashMap;
 import java.util.Map;
 
-public class JSEvalProcessor extends StreamPipesDataProcessor {
+public class JSEvalProcessor implements IStreamPipesDataProcessor {
 
   private static final String JS_FUNCTION = "jsFunction";
 
@@ -49,27 +50,33 @@ public class JSEvalProcessor extends 
StreamPipesDataProcessor {
   private Value function;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.enricher.jvm.jseval", 0)
-        .category(DataProcessorType.SCRIPTING)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredProperty(EpRequirements.anyProperty())
-            .build())
-        .requiredCodeblock(Labels.withId(JS_FUNCTION), CodeLanguage.Javascript)
-        .outputStrategy(OutputStrategies.userDefined())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        JSEvalProcessor::new,
+        ProcessingElementBuilder
+            .create("org.apache.streampipes.processors.enricher.jvm.jseval", 0)
+            .category(DataProcessorType.SCRIPTING)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredProperty(EpRequirements.anyProperty())
+                                .build())
+            .requiredCodeblock(Labels.withId(JS_FUNCTION), 
CodeLanguage.Javascript)
+            .outputStrategy(OutputStrategies.userDefined())
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) throws SpRuntimeException {
     polyglot = Context.create();
-    String code = parameters.extractor().codeblockValue(JS_FUNCTION);
+    String code = params.extractor()
+                        .codeblockValue(JS_FUNCTION);
     function = polyglot.eval("js", "(" + code + ")");
   }
 
@@ -94,7 +101,9 @@ public class JSEvalProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
+    if (polyglot != null) {
+      polyglot.close();
+    }
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java
index f1601c59a7..99103aa499 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java
@@ -18,26 +18,25 @@
 
 package 
org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment;
 
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-
-public class QualityControlLimitsEnrichmentProcessor extends 
StreamPipesDataProcessor {
 
+public class QualityControlLimitsEnrichmentProcessor implements 
IStreamPipesDataProcessor {
 
   protected static final String UPPER_CONTROL_LIMIT_LABEL = 
"upperControlLimitInput";
   protected static final String UPPER_WARNING_LIMIT_LABEL = 
"upperWarningLimitInput";
@@ -55,49 +54,47 @@ public class QualityControlLimitsEnrichmentProcessor 
extends StreamPipesDataProc
   private double lowerControlLimitValue;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment",
 0)
-        .category(DataProcessorType.ENRICH)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-                            .create()
-                            .requiredProperty(EpRequirements.anyProperty())
-                            .build())
-        .requiredFloatParameter(Labels.withId(UPPER_CONTROL_LIMIT_LABEL))
-        .requiredFloatParameter(Labels.withId(UPPER_WARNING_LIMIT_LABEL))
-        .requiredFloatParameter(Labels.withId(LOWER_WARNING_LIMIT_LABEL))
-        .requiredFloatParameter(Labels.withId(LOWER_CONTROL_LIMIT_LABEL))
-        .outputStrategy(
-            OutputStrategies.append(
-                EpProperties.doubleEp(Labels.empty(), UPPER_CONTROL_LIMIT, 
SO.NUMBER),
-                EpProperties.doubleEp(Labels.empty(), UPPER_WARNING_LIMIT, 
SO.NUMBER),
-                EpProperties.doubleEp(Labels.empty(), LOWER_WARNING_LIMIT, 
SO.NUMBER),
-                EpProperties.doubleEp(Labels.empty(), LOWER_CONTROL_LIMIT, 
SO.NUMBER)
-            ))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        QualityControlLimitsEnrichmentProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment",
 0)
+            .category(DataProcessorType.ENRICH)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredProperty(EpRequirements.anyProperty())
+                                .build())
+            .requiredFloatParameter(Labels.withId(UPPER_CONTROL_LIMIT_LABEL))
+            .requiredFloatParameter(Labels.withId(UPPER_WARNING_LIMIT_LABEL))
+            .requiredFloatParameter(Labels.withId(LOWER_WARNING_LIMIT_LABEL))
+            .requiredFloatParameter(Labels.withId(LOWER_CONTROL_LIMIT_LABEL))
+            .outputStrategy(
+                OutputStrategies.append(
+                    EpProperties.doubleEp(Labels.empty(), UPPER_CONTROL_LIMIT, 
SO.NUMBER),
+                    EpProperties.doubleEp(Labels.empty(), UPPER_WARNING_LIMIT, 
SO.NUMBER),
+                    EpProperties.doubleEp(Labels.empty(), LOWER_WARNING_LIMIT, 
SO.NUMBER),
+                    EpProperties.doubleEp(Labels.empty(), LOWER_CONTROL_LIMIT, 
SO.NUMBER)
+                ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(
-      ProcessorParams parameters,
-      SpOutputCollector spOutputCollector,
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
       EventProcessorRuntimeContext runtimeContext
-  ) throws SpRuntimeException {
-    this.upperControlLimitValue = parameters.extractor()
-                                            
.singleValueParameter(UPPER_CONTROL_LIMIT_LABEL, Double.class);
-    this.upperWarningLimitValue = parameters.extractor()
-                                            
.singleValueParameter(UPPER_WARNING_LIMIT_LABEL, Double.class);
-    this.lowerWarningLimitValue = parameters.extractor()
-                                            
.singleValueParameter(LOWER_WARNING_LIMIT_LABEL, Double.class);
-    this.lowerControlLimitValue = parameters.extractor()
-                                            
.singleValueParameter(LOWER_CONTROL_LIMIT_LABEL, Double.class);
-  }
-
-  @Override
-  public void onDetach() {
-
+  ) {
+    this.upperControlLimitValue = params.extractor()
+                                        
.singleValueParameter(UPPER_CONTROL_LIMIT_LABEL, Double.class);
+    this.upperWarningLimitValue = params.extractor()
+                                        
.singleValueParameter(UPPER_WARNING_LIMIT_LABEL, Double.class);
+    this.lowerWarningLimitValue = params.extractor()
+                                        
.singleValueParameter(LOWER_WARNING_LIMIT_LABEL, Double.class);
+    this.lowerControlLimitValue = params.extractor()
+                                        
.singleValueParameter(LOWER_CONTROL_LIMIT_LABEL, Double.class);
   }
 
   @Override
@@ -109,4 +106,8 @@ public class QualityControlLimitsEnrichmentProcessor 
extends StreamPipesDataProc
 
     collector.collect(event);
   }
+
+  @Override
+  public void onPipelineStopped() {
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
index 0571496f6b..c81b503711 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
@@ -20,11 +20,13 @@
 package org.apache.streampipes.processors.enricher.jvm.processor.math;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.enricher.jvm.processor.math.operation.Operation;
@@ -35,6 +37,7 @@ import 
org.apache.streampipes.processors.enricher.jvm.processor.math.operation.O
 import 
org.apache.streampipes.processors.enricher.jvm.processor.math.operation.OperationSubtracting;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -42,10 +45,8 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class MathOpProcessor extends StreamPipesDataProcessor {
+public class MathOpProcessor implements IStreamPipesDataProcessor {
 
   protected static final String RESULT_FIELD = "calculationResult";
   protected static final String LEFT_OPERAND = "leftOperand";
@@ -57,67 +58,70 @@ public class MathOpProcessor extends 
StreamPipesDataProcessor {
   String rightOperand;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.enricher.jvm.processor.math.mathop", 
0)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .category(DataProcessorType.ALGORITHM)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                Labels.withId(LEFT_OPERAND),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                Labels.withId(RIGHT_OPERAND),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(
-            OutputStrategies.append(
-                EpProperties.numberEp(Labels.empty(), RESULT_FIELD, 
SO.NUMBER)))
-        .requiredSingleValueSelection(Labels.withId(OPERATION), 
Options.from("+", "-", "/",
-            "*", "%"))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        MathOpProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.enricher.jvm.processor.math.mathop", 
0)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .category(DataProcessorType.ALGORITHM)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                    Labels.withId(LEFT_OPERAND),
+                    PropertyScope.NONE)
+                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                    Labels.withId(RIGHT_OPERAND),
+                    PropertyScope.NONE)
+                .build())
+            .outputStrategy(
+                OutputStrategies.append(
+                    EpProperties.numberEp(Labels.empty(), RESULT_FIELD, 
SO.NUMBER)))
+            .requiredSingleValueSelection(Labels.withId(OPERATION), 
Options.from("+", "-", "/",
+                "*", "%"))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.leftOperand = 
parameters.extractor().mappingPropertyValue(LEFT_OPERAND);
-    this.rightOperand = 
parameters.extractor().mappingPropertyValue(RIGHT_OPERAND);
-    String operation = parameters.extractor().selectedSingleValue(OPERATION, 
String.class);
+  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+    this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND);
+    this.rightOperand = params.extractor().mappingPropertyValue(RIGHT_OPERAND);
+    String operation = params.extractor().selectedSingleValue(OPERATION, 
String.class);
 
     switch (operation) {
       case "+":
-        arithmeticOperation = new OperationAddition();
+        this.arithmeticOperation = new OperationAddition();
         break;
       case "-":
-        arithmeticOperation = new OperationSubtracting();
-        break;
-      case "*":
-        arithmeticOperation = new OperationMultiply();
+        this.arithmeticOperation = new OperationSubtracting();
         break;
       case "/":
-        arithmeticOperation = new OperationDivide();
+        this.arithmeticOperation = new OperationDivide();
+        break;
+      case "*":
+        this.arithmeticOperation = new OperationMultiply();
         break;
       case "%":
-        arithmeticOperation = new OperationModulo();
+        this.arithmeticOperation = new OperationModulo();
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported operation: " + 
operation);
     }
   }
 
   @Override
-  public void onEvent(Event in, SpOutputCollector out) throws 
SpRuntimeException {
-    Double leftValue = 
in.getFieldBySelector(leftOperand).getAsPrimitive().getAsDouble();
-    Double rightValue = 
in.getFieldBySelector(rightOperand).getAsPrimitive().getAsDouble();
-
-    Double result = arithmeticOperation.operate(leftValue, rightValue);
-    in.addField(RESULT_FIELD, result);
+  public void onEvent(Event event, SpOutputCollector spOutputCollector) throws 
SpRuntimeException {
+    Double leftValue = 
event.getFieldBySelector(this.leftOperand).getAsPrimitive().getAsDouble();
+    Double rightValue = 
event.getFieldBySelector(this.rightOperand).getAsPrimitive().getAsDouble();
+    Double result = this.arithmeticOperation.operate(leftValue, rightValue);
 
-    out.collect(in);
+    event.addField(RESULT_FIELD, result);
+    spOutputCollector.collect(event);
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
index bdafa65682..085a7b3b8e 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
@@ -19,11 +19,13 @@
 package 
org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.enricher.jvm.processor.math.operation.Operation;
@@ -34,15 +36,14 @@ import 
org.apache.streampipes.processors.enricher.jvm.processor.math.operation.O
 import 
org.apache.streampipes.processors.enricher.jvm.processor.math.operation.OperationSubtracting;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class StaticMathOpProcessor extends StreamPipesDataProcessor {
+public class StaticMathOpProcessor implements IStreamPipesDataProcessor {
 
   private static final String RESULT_FIELD = "calculationResultStatic";
   private static final String LEFT_OPERAND = "leftOperand";
@@ -53,34 +54,35 @@ public class StaticMathOpProcessor extends 
StreamPipesDataProcessor {
   String leftOperand;
   double rightOperandValue;
 
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop",
 0)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .category(DataProcessorType.ALGORITHM)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                Labels.withId(LEFT_OPERAND),
-                PropertyScope.NONE)
-            .build())
-        .requiredFloatParameter(Labels.withId(RIGHT_OPERAND_VALUE))
-        .outputStrategy(
-            OutputStrategies.keep())
-        .requiredSingleValueSelection(Labels.withId(OPERATION),
-            Options.from("+", "-", "/", "*", "%"))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        StaticMathOpProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop",
 0)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .category(DataProcessorType.ALGORITHM)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                    Labels.withId(LEFT_OPERAND),
+                    PropertyScope.NONE)
+                .build())
+            .requiredFloatParameter(Labels.withId(RIGHT_OPERAND_VALUE))
+            .outputStrategy(
+                OutputStrategies.keep())
+            .requiredSingleValueSelection(Labels.withId(OPERATION),
+                Options.from("+", "-", "/", "*", "%"))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    this.leftOperand = 
parameters.extractor().mappingPropertyValue(LEFT_OPERAND);
-    this.rightOperandValue = 
parameters.extractor().singleValueParameter(RIGHT_OPERAND_VALUE, Double.class);
-    String operation = parameters.extractor().selectedSingleValue(OPERATION, 
String.class);
+  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector spOutputCollector, EventProcessorRuntimeContext 
runtimeContext) {
+    this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND);
+    this.rightOperandValue = 
params.extractor().singleValueParameter(RIGHT_OPERAND_VALUE, Double.class);
+    String operation = params.extractor().selectedSingleValue(OPERATION, 
String.class);
 
     switch (operation) {
       case "+":
@@ -112,7 +114,6 @@ public class StaticMathOpProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java
index 47a955052b..2438d47dcf 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java
@@ -19,14 +19,17 @@
 package org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,14 +37,12 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.helpers.Tuple2;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 
-public class SizeMeasureProcessor extends StreamPipesDataProcessor {
+public class SizeMeasureProcessor implements IStreamPipesDataProcessor {
 
   static final String SIZE_UNIT = "sizeUnit";
   static final String BYTE_SIZE = "BYTE";
@@ -57,40 +58,43 @@ public class SizeMeasureProcessor extends 
StreamPipesDataProcessor {
   private String sizeUnit;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.enricher.jvm.sizemeasure", 
0)
-        .category(DataProcessorType.STRUCTURE_ANALYTICS)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-                            .create()
-                            .requiredProperty(EpRequirements.anyProperty())
-                            .build())
-        .requiredSingleValueSelection(
-            Labels.withId(SIZE_UNIT),
-            Options.from(
-                new Tuple2<>(BYTES_OPTION, BYTE_SIZE),
-                new Tuple2<>(KILO_BYTES_OPTION, KILOBYTE_SIZE),
-                new Tuple2<>(MEGA_BYTES_OPTION, MEGABYTE_SIZE)
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        SizeMeasureProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.enricher.jvm.sizemeasure", 0)
+            .category(DataProcessorType.STRUCTURE_ANALYTICS)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredProperty(EpRequirements.anyProperty())
+                                .build())
+            .requiredSingleValueSelection(
+                Labels.withId(SIZE_UNIT),
+                Options.from(
+                    new Tuple2<>(BYTES_OPTION, BYTE_SIZE),
+                    new Tuple2<>(KILO_BYTES_OPTION, KILOBYTE_SIZE),
+                    new Tuple2<>(MEGA_BYTES_OPTION, MEGABYTE_SIZE)
+                )
             )
-        )
-        .outputStrategy(OutputStrategies.append(EpProperties.doubleEp(
-            Labels.withId(EVENT_SIZE),
-            EVENT_SIZE,
-            "http://schema.org/contentSize";
-        )))
-        .build();
+            .outputStrategy(OutputStrategies.append(EpProperties.doubleEp(
+                Labels.withId(EVENT_SIZE),
+                EVENT_SIZE,
+                "http://schema.org/contentSize";
+            )))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(
-      ProcessorParams parameters,
-      SpOutputCollector spOutputCollector,
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
       EventProcessorRuntimeContext runtimeContext
-  ) throws SpRuntimeException {
-    this.sizeUnit = parameters.extractor()
-                              .selectedSingleValueInternalName(SIZE_UNIT, 
String.class);
+  ) {
+    this.sizeUnit = params.extractor()
+                          .selectedSingleValueInternalName(SIZE_UNIT, 
String.class);
   }
 
   @Override
@@ -105,13 +109,12 @@ public class SizeMeasureProcessor extends 
StreamPipesDataProcessor {
       event.addField(EVENT_SIZE, size);
       collector.collect(event);
     } catch (IOException e) {
-      e.printStackTrace();
+      throw new SpRuntimeException("Error calculating event size", e);
     }
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 
   private int getSizeInBytes(Object map) throws IOException {
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
index eb96233d70..8d9b536cb1 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
@@ -19,15 +19,18 @@
 package org.apache.streampipes.processors.enricher.jvm.processor.trigonometry;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -35,10 +38,8 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class TrigonometryProcessor extends StreamPipesDataProcessor {
+public class TrigonometryProcessor implements IStreamPipesDataProcessor {
 
   private static final String OPERAND = "operand";
   private static final String OPERATION = "operation";
@@ -47,35 +48,34 @@ public class TrigonometryProcessor extends 
StreamPipesDataProcessor {
   private Operation operation;
   private String operand;
 
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.enricher.jvm.processor.trigonometry",
 0)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .category(DataProcessorType.ALGORITHM)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                Labels.withId(OPERAND),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(
-            OutputStrategies.append(
-                EpProperties.numberEp(Labels.empty(), RESULT_FIELD, 
SO.NUMBER)))
-        .requiredSingleValueSelection(Labels.withId(OPERATION),
-            Options.from("sin", "cos", "tan"))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        TrigonometryProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.enricher.jvm.processor.trigonometry",
 0)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .category(DataProcessorType.ALGORITHM)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                    Labels.withId(OPERAND),
+                    PropertyScope.NONE)
+                .build())
+            .outputStrategy(
+                OutputStrategies.append(
+                    EpProperties.numberEp(Labels.empty(), RESULT_FIELD, 
SO.NUMBER)))
+            .requiredSingleValueSelection(Labels.withId(OPERATION),
+                Options.from("sin", "cos", "tan"))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    this.operand = parameters.extractor().mappingPropertyValue(OPERAND);
-    String stringOperation = 
parameters.extractor().selectedSingleValue(OPERATION, String.class);
+  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+    this.operand = params.extractor().mappingPropertyValue(OPERAND);
+    String stringOperation = params.extractor().selectedSingleValue(OPERATION, 
String.class);
 
     switch (stringOperation) {
       case "sin":
@@ -86,7 +86,6 @@ public class TrigonometryProcessor extends 
StreamPipesDataProcessor {
         break;
       case "tan":
         operation = Operation.TAN;
-
     }
   }
 
@@ -108,7 +107,6 @@ public class TrigonometryProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
index fac9005d72..a6ec429321 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
@@ -19,25 +19,26 @@
 package org.apache.streampipes.processors.enricher.jvm.processor.valuechange;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class ValueChangeProcessor extends StreamPipesDataProcessor {
+public class ValueChangeProcessor implements IStreamPipesDataProcessor {
   private static final String CHANGE_VALUE_MAPPING_ID = "change-value-mapping";
   private static final String FROM_PROPERTY_VALUE_ID = "from-property-value";
   private static final String TO_PROPERTY_VALUE_ID = "to-property-value";
@@ -50,32 +51,34 @@ public class ValueChangeProcessor extends 
StreamPipesDataProcessor {
   private float lastValueOfEvent;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.enricher.jvm.valuechange", 
0)
-        .category(DataProcessorType.VALUE_OBSERVER)
-        .withAssets(ExtensionAssetType.DOCUMENTATION)
-        .withLocales(Locales.EN)
-        .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID))
-        .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID))
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                Labels.withId(CHANGE_VALUE_MAPPING_ID),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), IS_CHANGED, 
SO.BOOLEAN)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        ValueChangeProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.enricher.jvm.valuechange", 0)
+            .category(DataProcessorType.VALUE_OBSERVER)
+            .withAssets(ExtensionAssetType.DOCUMENTATION)
+            .withLocales(Locales.EN)
+            .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID))
+            .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID))
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                    Labels.withId(CHANGE_VALUE_MAPPING_ID),
+                    PropertyScope.NONE)
+                .build())
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), 
IS_CHANGED, SO.BOOLEAN)))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams processorParams, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext 
eventProcessorRuntimeContext) throws SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
     this.lastValueOfEvent = Float.MAX_VALUE;
-    this.userDefinedFrom = 
processorParams.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, 
Float.class);
-    this.userDefinedTo = 
processorParams.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, 
Float.class);
-    this.mappingProperty = 
processorParams.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID);
+    this.userDefinedFrom = 
params.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class);
+    this.userDefinedTo = 
params.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class);
+    this.mappingProperty = 
params.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID);
   }
 
   @Override
@@ -96,7 +99,6 @@ public class ValueChangeProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }


Reply via email to