This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new a674723489 refactor(#3651): Update WelfordChangeDetection to implement 
IStreamPipesDataProcessor interface (#3656)
a674723489 is described below

commit a6747234897c291d40fa230ee36c9a1d0696fc28
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jun 11 08:19:55 2025 +0200

    refactor(#3651): Update WelfordChangeDetection to implement 
IStreamPipesDataProcessor interface (#3656)
---
 .../jvm/welford/WelfordChangeDetection.java        | 72 +++++++++++-----------
 1 file changed, 36 insertions(+), 36 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
 
b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
index f44f299e05..bb86a0c0cc 100644
--- 
a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
+++ 
b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -18,28 +18,28 @@
 package org.apache.streampipes.processors.changedetection.jvm.welford;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.Arrays;
 
-public class WelfordChangeDetection extends StreamPipesDataProcessor {
+public class WelfordChangeDetection implements IStreamPipesDataProcessor {
 
   private static final String NUMBER_MAPPING = "number-mapping";
   private static final String PARAM_K = "param-k";
@@ -53,42 +53,42 @@ public class WelfordChangeDetection extends 
StreamPipesDataProcessor {
   private WelfordAggregate welfordAggregate;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford",
 0)
-        .category(DataProcessorType.VALUE_OBSERVER)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                Labels.withId(NUMBER_MAPPING),
-                PropertyScope.NONE).build())
-        .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 
0.01f)
-        .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 
0.01f)
-        .outputStrategy(
-            OutputStrategies.append(
-                Arrays.asList(
-                    EpProperties.numberEp(Labels.empty(), 
WelfordEventFields.VAL_LOW.label, SO.NUMBER),
-                    EpProperties.numberEp(Labels.empty(), 
WelfordEventFields.VAL_HIGH.label, SO.NUMBER),
-                    EpProperties.booleanEp(Labels.empty(), 
WelfordEventFields.DECISION_LOW.label, SO.BOOLEAN),
-                    EpProperties.booleanEp(Labels.empty(), 
WelfordEventFields.DECISION_HIGH.label, SO.BOOLEAN)
-                )
-            ))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        WelfordChangeDetection::new,
+        
ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford",
 0)
+            .category(DataProcessorType.VALUE_OBSERVER)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                .create()
+                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                    Labels.withId(NUMBER_MAPPING),
+                    PropertyScope.NONE).build())
+            .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 
100.0f, 0.01f)
+            .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 
100.0f, 0.01f)
+            .outputStrategy(
+                OutputStrategies.append(
+                    Arrays.asList(
+                        EpProperties.numberEp(Labels.empty(), 
WelfordEventFields.VAL_LOW.label, SO.NUMBER),
+                        EpProperties.numberEp(Labels.empty(), 
WelfordEventFields.VAL_HIGH.label, SO.NUMBER),
+                        EpProperties.booleanEp(Labels.empty(), 
WelfordEventFields.DECISION_LOW.label, SO.BOOLEAN),
+                        EpProperties.booleanEp(Labels.empty(), 
WelfordEventFields.DECISION_HIGH.label, SO.BOOLEAN)
+                    )
+                ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-
-    ProcessingElementParameterExtractor extractor = parameters.extractor();
-    this.selectedNumberMapping = 
extractor.mappingPropertyValue(NUMBER_MAPPING);
-    this.k = extractor.singleValueParameter(PARAM_K, Double.class);
-    this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+  public void onPipelineStarted(IDataProcessorParameters parameters, 
SpOutputCollector spOutputCollector,
+                                EventProcessorRuntimeContext runtimeContext) 
throws SpRuntimeException {
+    this.selectedNumberMapping = 
parameters.extractor().mappingPropertyValue(NUMBER_MAPPING);
+    this.k = parameters.extractor().singleValueParameter(PARAM_K, 
Double.class);
+    this.h = parameters.extractor().singleValueParameter(PARAM_H, 
Double.class);
     this.cuSumLow = 0.0;
     this.cuSumHigh = 0.0;
     this.welfordAggregate = new WelfordAggregate();
-
   }
 
   @Override
@@ -112,7 +112,7 @@ public class WelfordChangeDetection extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() throws SpRuntimeException {
     this.cuSumLow = 0.0;
     this.cuSumHigh = 0.0;
   }

Reply via email to