This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 3646-update-processors-in-processors-enrichmer-jvm in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 86978d6e41f70fdbc6529580ec7803b3bab0e82d Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Jun 6 21:58:44 2025 +0200 refactor: Update processors to implement IStreamPipesDataProcessor and refactor method signatures --- .../jvm/processor/math/MathOpProcessor.java | 51 +++++++++++++++------- .../math/staticmathop/StaticMathOpProcessor.java | 36 ++++++++++----- .../trigonometry/TrigonometryProcessor.java | 34 ++++++++++----- .../valuechange/ValueChangeProcessor.java | 37 +++++++++------- 4 files changed, 104 insertions(+), 54 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java index c81b503711..91ada2eef1 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java @@ -67,28 +67,43 @@ public class MathOpProcessor implements IStreamPipesDataProcessor { .withLocales(Locales.EN) .category(DataProcessorType.ALGORITHM) .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(LEFT_OPERAND), - PropertyScope.NONE) - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(RIGHT_OPERAND), - PropertyScope.NONE) - .build()) + .create() + .requiredPropertyWithUnaryMapping( + EpRequirements.numberReq(), + Labels.withId(LEFT_OPERAND), + PropertyScope.NONE + ) + .requiredPropertyWithUnaryMapping( + EpRequirements.numberReq(), + Labels.withId(RIGHT_OPERAND), + PropertyScope.NONE + ) + .build()) .outputStrategy( OutputStrategies.append( EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) - .requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("+", "-", "/", - "*", "%")) + .requiredSingleValueSelection( + Labels.withId(OPERATION), Options.from( + "+", "-", "/", + "*", "%" + ) + ) .build() ); } @Override - public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) { - this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND); - this.rightOperand = params.extractor().mappingPropertyValue(RIGHT_OPERAND); - String operation = params.extractor().selectedSingleValue(OPERATION, String.class); + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector collector, + EventProcessorRuntimeContext runtimeContext + ) { + this.leftOperand = params.extractor() + .mappingPropertyValue(LEFT_OPERAND); + this.rightOperand = params.extractor() + .mappingPropertyValue(RIGHT_OPERAND); + String operation = params.extractor() + .selectedSingleValue(OPERATION, String.class); switch (operation) { case "+": @@ -113,8 +128,12 @@ public class MathOpProcessor implements IStreamPipesDataProcessor { @Override public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException { - Double leftValue = event.getFieldBySelector(this.leftOperand).getAsPrimitive().getAsDouble(); - Double rightValue = event.getFieldBySelector(this.rightOperand).getAsPrimitive().getAsDouble(); + Double leftValue = event.getFieldBySelector(this.leftOperand) + .getAsPrimitive() + .getAsDouble(); + Double rightValue = event.getFieldBySelector(this.rightOperand) + .getAsPrimitive() + .getAsDouble(); Double result = this.arithmeticOperation.operate(leftValue, rightValue); event.addField(RESULT_FIELD, result); diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java index 085a7b3b8e..fa4efc5e01 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java @@ -64,25 +64,36 @@ public class StaticMathOpProcessor implements IStreamPipesDataProcessor { .withLocales(Locales.EN) .category(DataProcessorType.ALGORITHM) .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(LEFT_OPERAND), - PropertyScope.NONE) - .build()) + .create() + .requiredPropertyWithUnaryMapping( + EpRequirements.numberReq(), + Labels.withId(LEFT_OPERAND), + PropertyScope.NONE + ) + .build()) .requiredFloatParameter(Labels.withId(RIGHT_OPERAND_VALUE)) .outputStrategy( OutputStrategies.keep()) - .requiredSingleValueSelection(Labels.withId(OPERATION), - Options.from("+", "-", "/", "*", "%")) + .requiredSingleValueSelection( + Labels.withId(OPERATION), + Options.from("+", "-", "/", "*", "%") + ) .build() ); } @Override - public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { - this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND); - this.rightOperandValue = params.extractor().singleValueParameter(RIGHT_OPERAND_VALUE, Double.class); - String operation = params.extractor().selectedSingleValue(OPERATION, String.class); + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector spOutputCollector, + EventProcessorRuntimeContext runtimeContext + ) { + this.leftOperand = params.extractor() + .mappingPropertyValue(LEFT_OPERAND); + this.rightOperandValue = params.extractor() + .singleValueParameter(RIGHT_OPERAND_VALUE, Double.class); + String operation = params.extractor() + .selectedSingleValue(OPERATION, String.class); switch (operation) { case "+": @@ -105,7 +116,8 @@ public class StaticMathOpProcessor implements IStreamPipesDataProcessor { @Override public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { Double leftValue = Double.parseDouble(String.valueOf(in.getFieldBySelector(leftOperand) - .getAsPrimitive().getAsDouble())); + .getAsPrimitive() + .getAsDouble())); Double result = arithmeticOperation.operate(leftValue, rightOperandValue); in.updateFieldBySelector(leftOperand, result); diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java index 8d9b536cb1..2b5de9c50a 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java @@ -58,24 +58,34 @@ public class TrigonometryProcessor implements IStreamPipesDataProcessor { .withLocales(Locales.EN) .category(DataProcessorType.ALGORITHM) .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(OPERAND), - PropertyScope.NONE) - .build()) + .create() + .requiredPropertyWithUnaryMapping( + EpRequirements.numberReq(), + Labels.withId(OPERAND), + PropertyScope.NONE + ) + .build()) .outputStrategy( OutputStrategies.append( EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) - .requiredSingleValueSelection(Labels.withId(OPERATION), - Options.from("sin", "cos", "tan")) + .requiredSingleValueSelection( + Labels.withId(OPERATION), + Options.from("sin", "cos", "tan") + ) .build() ); } @Override - public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) { - this.operand = params.extractor().mappingPropertyValue(OPERAND); - String stringOperation = params.extractor().selectedSingleValue(OPERATION, String.class); + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector collector, + EventProcessorRuntimeContext runtimeContext + ) { + this.operand = params.extractor() + .mappingPropertyValue(OPERAND); + String stringOperation = params.extractor() + .selectedSingleValue(OPERATION, String.class); switch (stringOperation) { case "sin": @@ -91,7 +101,9 @@ public class TrigonometryProcessor implements IStreamPipesDataProcessor { @Override public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { - double value = in.getFieldBySelector(operand).getAsPrimitive().getAsDouble(); + double value = in.getFieldBySelector(operand) + .getAsPrimitive() + .getAsDouble(); double result; if (operation == Operation.SIN) { diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java index a6ec429321..197af7ea7b 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java @@ -62,11 +62,13 @@ public class ValueChangeProcessor implements IStreamPipesDataProcessor { .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID)) .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID)) .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(CHANGE_VALUE_MAPPING_ID), - PropertyScope.NONE) - .build()) + .create() + .requiredPropertyWithUnaryMapping( + EpRequirements.numberReq(), + Labels.withId(CHANGE_VALUE_MAPPING_ID), + PropertyScope.NONE + ) + .build()) .outputStrategy(OutputStrategies.append( EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), IS_CHANGED, SO.BOOLEAN))) .build() @@ -74,22 +76,27 @@ public class ValueChangeProcessor implements IStreamPipesDataProcessor { } @Override - public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) { + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector collector, + EventProcessorRuntimeContext runtimeContext + ) { this.lastValueOfEvent = Float.MAX_VALUE; - this.userDefinedFrom = params.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class); - this.userDefinedTo = params.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class); - this.mappingProperty = params.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID); + this.userDefinedFrom = params.extractor() + .singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class); + this.userDefinedTo = params.extractor() + .singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class); + this.mappingProperty = params.extractor() + .mappingPropertyValue(CHANGE_VALUE_MAPPING_ID); } @Override public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException { - float thisValue = event.getFieldBySelector(mappingProperty).getAsPrimitive().getAsFloat(); + float thisValue = event.getFieldBySelector(mappingProperty) + .getAsPrimitive() + .getAsFloat(); if (this.lastValueOfEvent != Float.MAX_VALUE) { - if (this.lastValueOfEvent == this.userDefinedFrom && thisValue == this.userDefinedTo) { - event.addField(IS_CHANGED, true); - } else { - event.addField(IS_CHANGED, false); - } + event.addField(IS_CHANGED, this.lastValueOfEvent == this.userDefinedFrom && thisValue == this.userDefinedTo); } else { event.addField(IS_CHANGED, false); }
