This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3646-update-processors-in-processors-enrichmer-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 86978d6e41f70fdbc6529580ec7803b3bab0e82d
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jun 6 21:58:44 2025 +0200

    refactor: Update processors to implement IStreamPipesDataProcessor and 
refactor method signatures
---
 .../jvm/processor/math/MathOpProcessor.java        | 51 +++++++++++++++-------
 .../math/staticmathop/StaticMathOpProcessor.java   | 36 ++++++++++-----
 .../trigonometry/TrigonometryProcessor.java        | 34 ++++++++++-----
 .../valuechange/ValueChangeProcessor.java          | 37 +++++++++-------
 4 files changed, 104 insertions(+), 54 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
index c81b503711..91ada2eef1 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java
@@ -67,28 +67,43 @@ public class MathOpProcessor implements 
IStreamPipesDataProcessor {
             .withLocales(Locales.EN)
             .category(DataProcessorType.ALGORITHM)
             .requiredStream(StreamRequirementsBuilder
-                .create()
-                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                    Labels.withId(LEFT_OPERAND),
-                    PropertyScope.NONE)
-                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                    Labels.withId(RIGHT_OPERAND),
-                    PropertyScope.NONE)
-                .build())
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.numberReq(),
+                                    Labels.withId(LEFT_OPERAND),
+                                    PropertyScope.NONE
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.numberReq(),
+                                    Labels.withId(RIGHT_OPERAND),
+                                    PropertyScope.NONE
+                                )
+                                .build())
             .outputStrategy(
                 OutputStrategies.append(
                     EpProperties.numberEp(Labels.empty(), RESULT_FIELD, 
SO.NUMBER)))
-            .requiredSingleValueSelection(Labels.withId(OPERATION), 
Options.from("+", "-", "/",
-                "*", "%"))
+            .requiredSingleValueSelection(
+                Labels.withId(OPERATION), Options.from(
+                    "+", "-", "/",
+                    "*", "%"
+                )
+            )
             .build()
     );
   }
 
   @Override
-  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
-    this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND);
-    this.rightOperand = params.extractor().mappingPropertyValue(RIGHT_OPERAND);
-    String operation = params.extractor().selectedSingleValue(OPERATION, 
String.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    this.leftOperand = params.extractor()
+                             .mappingPropertyValue(LEFT_OPERAND);
+    this.rightOperand = params.extractor()
+                              .mappingPropertyValue(RIGHT_OPERAND);
+    String operation = params.extractor()
+                             .selectedSingleValue(OPERATION, String.class);
 
     switch (operation) {
       case "+":
@@ -113,8 +128,12 @@ public class MathOpProcessor implements 
IStreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector spOutputCollector) throws 
SpRuntimeException {
-    Double leftValue = 
event.getFieldBySelector(this.leftOperand).getAsPrimitive().getAsDouble();
-    Double rightValue = 
event.getFieldBySelector(this.rightOperand).getAsPrimitive().getAsDouble();
+    Double leftValue = event.getFieldBySelector(this.leftOperand)
+                            .getAsPrimitive()
+                            .getAsDouble();
+    Double rightValue = event.getFieldBySelector(this.rightOperand)
+                             .getAsPrimitive()
+                             .getAsDouble();
     Double result = this.arithmeticOperation.operate(leftValue, rightValue);
 
     event.addField(RESULT_FIELD, result);
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
index 085a7b3b8e..fa4efc5e01 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java
@@ -64,25 +64,36 @@ public class StaticMathOpProcessor implements 
IStreamPipesDataProcessor {
             .withLocales(Locales.EN)
             .category(DataProcessorType.ALGORITHM)
             .requiredStream(StreamRequirementsBuilder
-                .create()
-                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                    Labels.withId(LEFT_OPERAND),
-                    PropertyScope.NONE)
-                .build())
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.numberReq(),
+                                    Labels.withId(LEFT_OPERAND),
+                                    PropertyScope.NONE
+                                )
+                                .build())
             .requiredFloatParameter(Labels.withId(RIGHT_OPERAND_VALUE))
             .outputStrategy(
                 OutputStrategies.keep())
-            .requiredSingleValueSelection(Labels.withId(OPERATION),
-                Options.from("+", "-", "/", "*", "%"))
+            .requiredSingleValueSelection(
+                Labels.withId(OPERATION),
+                Options.from("+", "-", "/", "*", "%")
+            )
             .build()
     );
   }
 
   @Override
-  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector spOutputCollector, EventProcessorRuntimeContext 
runtimeContext) {
-    this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND);
-    this.rightOperandValue = 
params.extractor().singleValueParameter(RIGHT_OPERAND_VALUE, Double.class);
-    String operation = params.extractor().selectedSingleValue(OPERATION, 
String.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector spOutputCollector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    this.leftOperand = params.extractor()
+                             .mappingPropertyValue(LEFT_OPERAND);
+    this.rightOperandValue = params.extractor()
+                                   .singleValueParameter(RIGHT_OPERAND_VALUE, 
Double.class);
+    String operation = params.extractor()
+                             .selectedSingleValue(OPERATION, String.class);
 
     switch (operation) {
       case "+":
@@ -105,7 +116,8 @@ public class StaticMathOpProcessor implements 
IStreamPipesDataProcessor {
   @Override
   public void onEvent(Event in, SpOutputCollector out) throws 
SpRuntimeException {
     Double leftValue = 
Double.parseDouble(String.valueOf(in.getFieldBySelector(leftOperand)
-        .getAsPrimitive().getAsDouble()));
+                                                           .getAsPrimitive()
+                                                           .getAsDouble()));
 
     Double result = arithmeticOperation.operate(leftValue, rightOperandValue);
     in.updateFieldBySelector(leftOperand, result);
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
index 8d9b536cb1..2b5de9c50a 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java
@@ -58,24 +58,34 @@ public class TrigonometryProcessor implements 
IStreamPipesDataProcessor {
             .withLocales(Locales.EN)
             .category(DataProcessorType.ALGORITHM)
             .requiredStream(StreamRequirementsBuilder
-                .create()
-                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                    Labels.withId(OPERAND),
-                    PropertyScope.NONE)
-                .build())
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.numberReq(),
+                                    Labels.withId(OPERAND),
+                                    PropertyScope.NONE
+                                )
+                                .build())
             .outputStrategy(
                 OutputStrategies.append(
                     EpProperties.numberEp(Labels.empty(), RESULT_FIELD, 
SO.NUMBER)))
-            .requiredSingleValueSelection(Labels.withId(OPERATION),
-                Options.from("sin", "cos", "tan"))
+            .requiredSingleValueSelection(
+                Labels.withId(OPERATION),
+                Options.from("sin", "cos", "tan")
+            )
             .build()
     );
   }
 
   @Override
-  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
-    this.operand = params.extractor().mappingPropertyValue(OPERAND);
-    String stringOperation = params.extractor().selectedSingleValue(OPERATION, 
String.class);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    this.operand = params.extractor()
+                         .mappingPropertyValue(OPERAND);
+    String stringOperation = params.extractor()
+                                   .selectedSingleValue(OPERATION, 
String.class);
 
     switch (stringOperation) {
       case "sin":
@@ -91,7 +101,9 @@ public class TrigonometryProcessor implements 
IStreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event in, SpOutputCollector out) throws 
SpRuntimeException {
-    double value = 
in.getFieldBySelector(operand).getAsPrimitive().getAsDouble();
+    double value = in.getFieldBySelector(operand)
+                     .getAsPrimitive()
+                     .getAsDouble();
     double result;
 
     if (operation == Operation.SIN) {
diff --git 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
index a6ec429321..197af7ea7b 100644
--- 
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java
@@ -62,11 +62,13 @@ public class ValueChangeProcessor implements 
IStreamPipesDataProcessor {
             .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID))
             .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID))
             .requiredStream(StreamRequirementsBuilder
-                .create()
-                .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                    Labels.withId(CHANGE_VALUE_MAPPING_ID),
-                    PropertyScope.NONE)
-                .build())
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.numberReq(),
+                                    Labels.withId(CHANGE_VALUE_MAPPING_ID),
+                                    PropertyScope.NONE
+                                )
+                                .build())
             .outputStrategy(OutputStrategies.append(
                 EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), 
IS_CHANGED, SO.BOOLEAN)))
             .build()
@@ -74,22 +76,27 @@ public class ValueChangeProcessor implements 
IStreamPipesDataProcessor {
   }
 
   @Override
-  public void onPipelineStarted(IDataProcessorParameters params, 
SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
     this.lastValueOfEvent = Float.MAX_VALUE;
-    this.userDefinedFrom = 
params.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class);
-    this.userDefinedTo = 
params.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class);
-    this.mappingProperty = 
params.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID);
+    this.userDefinedFrom = params.extractor()
+                                 .singleValueParameter(FROM_PROPERTY_VALUE_ID, 
Float.class);
+    this.userDefinedTo = params.extractor()
+                               .singleValueParameter(TO_PROPERTY_VALUE_ID, 
Float.class);
+    this.mappingProperty = params.extractor()
+                                 
.mappingPropertyValue(CHANGE_VALUE_MAPPING_ID);
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector spOutputCollector) throws 
SpRuntimeException {
-    float thisValue = 
event.getFieldBySelector(mappingProperty).getAsPrimitive().getAsFloat();
+    float thisValue = event.getFieldBySelector(mappingProperty)
+                           .getAsPrimitive()
+                           .getAsFloat();
     if (this.lastValueOfEvent != Float.MAX_VALUE) {
-      if (this.lastValueOfEvent == this.userDefinedFrom && thisValue == 
this.userDefinedTo) {
-        event.addField(IS_CHANGED, true);
-      } else {
-        event.addField(IS_CHANGED, false);
-      }
+      event.addField(IS_CHANGED, this.lastValueOfEvent == this.userDefinedFrom 
&& thisValue == this.userDefinedTo);
     } else {
       event.addField(IS_CHANGED, false);
     }

Reply via email to