This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new a674723489 refactor(#3651): Update WelfordChangeDetection to implement
IStreamPipesDataProcessor interface (#3656)
a674723489 is described below
commit a6747234897c291d40fa230ee36c9a1d0696fc28
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jun 11 08:19:55 2025 +0200
refactor(#3651): Update WelfordChangeDetection to implement
IStreamPipesDataProcessor interface (#3656)
---
.../jvm/welford/WelfordChangeDetection.java | 72 +++++++++++-----------
1 file changed, 36 insertions(+), 36 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
index f44f299e05..bb86a0c0cc 100644
---
a/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
+++
b/streampipes-extensions/streampipes-processors-change-detection-jvm/src/main/java/org/apache/streampipes/processors/changedetection/jvm/welford/WelfordChangeDetection.java
@@ -18,28 +18,28 @@
package org.apache.streampipes.processors.changedetection.jvm.welford;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.Arrays;
-public class WelfordChangeDetection extends StreamPipesDataProcessor {
+public class WelfordChangeDetection implements IStreamPipesDataProcessor {
private static final String NUMBER_MAPPING = "number-mapping";
private static final String PARAM_K = "param-k";
@@ -53,42 +53,42 @@ public class WelfordChangeDetection extends
StreamPipesDataProcessor {
private WelfordAggregate welfordAggregate;
@Override
- public DataProcessorDescription declareModel() {
- return
ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford",
0)
- .category(DataProcessorType.VALUE_OBSERVER)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.withId(NUMBER_MAPPING),
- PropertyScope.NONE).build())
- .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f,
0.01f)
- .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f,
0.01f)
- .outputStrategy(
- OutputStrategies.append(
- Arrays.asList(
- EpProperties.numberEp(Labels.empty(),
WelfordEventFields.VAL_LOW.label, SO.NUMBER),
- EpProperties.numberEp(Labels.empty(),
WelfordEventFields.VAL_HIGH.label, SO.NUMBER),
- EpProperties.booleanEp(Labels.empty(),
WelfordEventFields.DECISION_LOW.label, SO.BOOLEAN),
- EpProperties.booleanEp(Labels.empty(),
WelfordEventFields.DECISION_HIGH.label, SO.BOOLEAN)
- )
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ WelfordChangeDetection::new,
+
ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford",
0)
+ .category(DataProcessorType.VALUE_OBSERVER)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.withId(NUMBER_MAPPING),
+ PropertyScope.NONE).build())
+ .requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f,
100.0f, 0.01f)
+ .requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f,
100.0f, 0.01f)
+ .outputStrategy(
+ OutputStrategies.append(
+ Arrays.asList(
+ EpProperties.numberEp(Labels.empty(),
WelfordEventFields.VAL_LOW.label, SO.NUMBER),
+ EpProperties.numberEp(Labels.empty(),
WelfordEventFields.VAL_HIGH.label, SO.NUMBER),
+ EpProperties.booleanEp(Labels.empty(),
WelfordEventFields.DECISION_LOW.label, SO.BOOLEAN),
+ EpProperties.booleanEp(Labels.empty(),
WelfordEventFields.DECISION_HIGH.label, SO.BOOLEAN)
+ )
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
-
- ProcessingElementParameterExtractor extractor = parameters.extractor();
- this.selectedNumberMapping =
extractor.mappingPropertyValue(NUMBER_MAPPING);
- this.k = extractor.singleValueParameter(PARAM_K, Double.class);
- this.h = extractor.singleValueParameter(PARAM_H, Double.class);
+ public void onPipelineStarted(IDataProcessorParameters parameters,
SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext)
throws SpRuntimeException {
+ this.selectedNumberMapping =
parameters.extractor().mappingPropertyValue(NUMBER_MAPPING);
+ this.k = parameters.extractor().singleValueParameter(PARAM_K,
Double.class);
+ this.h = parameters.extractor().singleValueParameter(PARAM_H,
Double.class);
this.cuSumLow = 0.0;
this.cuSumHigh = 0.0;
this.welfordAggregate = new WelfordAggregate();
-
}
@Override
@@ -112,7 +112,7 @@ public class WelfordChangeDetection extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() throws SpRuntimeException {
this.cuSumLow = 0.0;
this.cuSumHigh = 0.0;
}