This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 3646-update-processors-in-processors-enrichmer-jvm in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit d704d659addc34bb1fdee126485b0004f8b3e613 Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Jun 6 21:58:04 2025 +0200 refactor: Update processors to implement IStreamPipesDataProcessor and refactor method signatures --- .../jvm/processor/jseval/JSEvalProcessor.java | 55 +++++++------ .../QualityControlLimitsEnrichmentProcessor.java | 91 ++++++++++---------- .../jvm/processor/math/MathOpProcessor.java | 96 +++++++++++----------- .../math/staticmathop/StaticMathOpProcessor.java | 61 +++++++------- .../sizemeasure/SizeMeasureProcessor.java | 75 +++++++++-------- .../trigonometry/TrigonometryProcessor.java | 62 +++++++------- .../valuechange/ValueChangeProcessor.java | 58 ++++++------- 7 files changed, 258 insertions(+), 240 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java index 78ee849115..a43d831026 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/jseval/JSEvalProcessor.java @@ -18,21 +18,22 @@ package org.apache.streampipes.processors.enricher.jvm.processor.jseval; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.CodeLanguage; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.OutputStrategies; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; import org.graalvm.polyglot.Context; import org.graalvm.polyglot.Value; @@ -41,7 +42,7 @@ import org.graalvm.polyglot.proxy.ProxyObject; import java.util.HashMap; import java.util.Map; -public class JSEvalProcessor extends StreamPipesDataProcessor { +public class JSEvalProcessor implements IStreamPipesDataProcessor { private static final String JS_FUNCTION = "jsFunction"; @@ -49,27 +50,33 @@ public class JSEvalProcessor extends StreamPipesDataProcessor { private Value function; @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.jseval", 0) - .category(DataProcessorType.SCRIPTING) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .withLocales(Locales.EN) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredProperty(EpRequirements.anyProperty()) - .build()) - .requiredCodeblock(Labels.withId(JS_FUNCTION), CodeLanguage.Javascript) - .outputStrategy(OutputStrategies.userDefined()) - .build(); + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + JSEvalProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.jseval", 0) + .category(DataProcessorType.SCRIPTING) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredProperty(EpRequirements.anyProperty()) + .build()) + .requiredCodeblock(Labels.withId(JS_FUNCTION), CodeLanguage.Javascript) + .outputStrategy(OutputStrategies.userDefined()) + .build() + ); } @Override - public void onInvocation(ProcessorParams parameters, - SpOutputCollector spOutputCollector, - EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException { + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector collector, + EventProcessorRuntimeContext runtimeContext + ) throws SpRuntimeException { polyglot = Context.create(); - String code = parameters.extractor().codeblockValue(JS_FUNCTION); + String code = params.extractor() + .codeblockValue(JS_FUNCTION); function = polyglot.eval("js", "(" + code + ")"); } @@ -94,7 +101,9 @@ public class JSEvalProcessor extends StreamPipesDataProcessor { } @Override - public void onDetach() throws SpRuntimeException { - + public void onPipelineStopped() { + if (polyglot != null) { + polyglot.close(); + } } } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java index f1601c59a7..99103aa499 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsenrichment/QualityControlLimitsEnrichmentProcessor.java @@ -18,26 +18,25 @@ package org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment; -import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.EpProperties; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.vocabulary.SO; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; - -public class QualityControlLimitsEnrichmentProcessor extends StreamPipesDataProcessor { +public class QualityControlLimitsEnrichmentProcessor implements IStreamPipesDataProcessor { protected static final String UPPER_CONTROL_LIMIT_LABEL = "upperControlLimitInput"; protected static final String UPPER_WARNING_LIMIT_LABEL = "upperWarningLimitInput"; @@ -55,49 +54,47 @@ public class QualityControlLimitsEnrichmentProcessor extends StreamPipesDataProc private double lowerControlLimitValue; @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment", 0) - .category(DataProcessorType.ENRICH) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .withLocales(Locales.EN) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredProperty(EpRequirements.anyProperty()) - .build()) - .requiredFloatParameter(Labels.withId(UPPER_CONTROL_LIMIT_LABEL)) - .requiredFloatParameter(Labels.withId(UPPER_WARNING_LIMIT_LABEL)) - .requiredFloatParameter(Labels.withId(LOWER_WARNING_LIMIT_LABEL)) - .requiredFloatParameter(Labels.withId(LOWER_CONTROL_LIMIT_LABEL)) - .outputStrategy( - OutputStrategies.append( - EpProperties.doubleEp(Labels.empty(), UPPER_CONTROL_LIMIT, SO.NUMBER), - EpProperties.doubleEp(Labels.empty(), UPPER_WARNING_LIMIT, SO.NUMBER), - EpProperties.doubleEp(Labels.empty(), LOWER_WARNING_LIMIT, SO.NUMBER), - EpProperties.doubleEp(Labels.empty(), LOWER_CONTROL_LIMIT, SO.NUMBER) - )) - .build(); + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + QualityControlLimitsEnrichmentProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment", 0) + .category(DataProcessorType.ENRICH) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredProperty(EpRequirements.anyProperty()) + .build()) + .requiredFloatParameter(Labels.withId(UPPER_CONTROL_LIMIT_LABEL)) + .requiredFloatParameter(Labels.withId(UPPER_WARNING_LIMIT_LABEL)) + .requiredFloatParameter(Labels.withId(LOWER_WARNING_LIMIT_LABEL)) + .requiredFloatParameter(Labels.withId(LOWER_CONTROL_LIMIT_LABEL)) + .outputStrategy( + OutputStrategies.append( + EpProperties.doubleEp(Labels.empty(), UPPER_CONTROL_LIMIT, SO.NUMBER), + EpProperties.doubleEp(Labels.empty(), UPPER_WARNING_LIMIT, SO.NUMBER), + EpProperties.doubleEp(Labels.empty(), LOWER_WARNING_LIMIT, SO.NUMBER), + EpProperties.doubleEp(Labels.empty(), LOWER_CONTROL_LIMIT, SO.NUMBER) + )) + .build() + ); } @Override - public void onInvocation( - ProcessorParams parameters, - SpOutputCollector spOutputCollector, + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext - ) throws SpRuntimeException { - this.upperControlLimitValue = parameters.extractor() - .singleValueParameter(UPPER_CONTROL_LIMIT_LABEL, Double.class); - this.upperWarningLimitValue = parameters.extractor() - .singleValueParameter(UPPER_WARNING_LIMIT_LABEL, Double.class); - this.lowerWarningLimitValue = parameters.extractor() - .singleValueParameter(LOWER_WARNING_LIMIT_LABEL, Double.class); - this.lowerControlLimitValue = parameters.extractor() - .singleValueParameter(LOWER_CONTROL_LIMIT_LABEL, Double.class); - } - - @Override - public void onDetach() { - + ) { + this.upperControlLimitValue = params.extractor() + .singleValueParameter(UPPER_CONTROL_LIMIT_LABEL, Double.class); + this.upperWarningLimitValue = params.extractor() + .singleValueParameter(UPPER_WARNING_LIMIT_LABEL, Double.class); + this.lowerWarningLimitValue = params.extractor() + .singleValueParameter(LOWER_WARNING_LIMIT_LABEL, Double.class); + this.lowerControlLimitValue = params.extractor() + .singleValueParameter(LOWER_CONTROL_LIMIT_LABEL, Double.class); } @Override @@ -109,4 +106,8 @@ public class QualityControlLimitsEnrichmentProcessor extends StreamPipesDataProc collector.collect(event); } + + @Override + public void onPipelineStopped() { + } } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java index 0571496f6b..c81b503711 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/MathOpProcessor.java @@ -20,11 +20,13 @@ package org.apache.streampipes.processors.enricher.jvm.processor.math; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.processors.enricher.jvm.processor.math.operation.Operation; @@ -35,6 +37,7 @@ import org.apache.streampipes.processors.enricher.jvm.processor.math.operation.O import org.apache.streampipes.processors.enricher.jvm.processor.math.operation.OperationSubtracting; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.EpProperties; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; @@ -42,10 +45,8 @@ import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.vocabulary.SO; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; -public class MathOpProcessor extends StreamPipesDataProcessor { +public class MathOpProcessor implements IStreamPipesDataProcessor { protected static final String RESULT_FIELD = "calculationResult"; protected static final String LEFT_OPERAND = "leftOperand"; @@ -57,67 +58,70 @@ public class MathOpProcessor extends StreamPipesDataProcessor { String rightOperand; @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.processor.math.mathop", 0) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .withLocales(Locales.EN) - .category(DataProcessorType.ALGORITHM) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(LEFT_OPERAND), - PropertyScope.NONE) - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(RIGHT_OPERAND), - PropertyScope.NONE) - .build()) - .outputStrategy( - OutputStrategies.append( - EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) - .requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("+", "-", "/", - "*", "%")) - .build(); + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + MathOpProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.processor.math.mathop", 0) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .category(DataProcessorType.ALGORITHM) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), + Labels.withId(LEFT_OPERAND), + PropertyScope.NONE) + .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), + Labels.withId(RIGHT_OPERAND), + PropertyScope.NONE) + .build()) + .outputStrategy( + OutputStrategies.append( + EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) + .requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("+", "-", "/", + "*", "%")) + .build() + ); } @Override - public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, - EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException { - this.leftOperand = parameters.extractor().mappingPropertyValue(LEFT_OPERAND); - this.rightOperand = parameters.extractor().mappingPropertyValue(RIGHT_OPERAND); - String operation = parameters.extractor().selectedSingleValue(OPERATION, String.class); + public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) { + this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND); + this.rightOperand = params.extractor().mappingPropertyValue(RIGHT_OPERAND); + String operation = params.extractor().selectedSingleValue(OPERATION, String.class); switch (operation) { case "+": - arithmeticOperation = new OperationAddition(); + this.arithmeticOperation = new OperationAddition(); break; case "-": - arithmeticOperation = new OperationSubtracting(); - break; - case "*": - arithmeticOperation = new OperationMultiply(); + this.arithmeticOperation = new OperationSubtracting(); break; case "/": - arithmeticOperation = new OperationDivide(); + this.arithmeticOperation = new OperationDivide(); + break; + case "*": + this.arithmeticOperation = new OperationMultiply(); break; case "%": - arithmeticOperation = new OperationModulo(); + this.arithmeticOperation = new OperationModulo(); + break; + default: + throw new IllegalArgumentException("Unsupported operation: " + operation); } } @Override - public void onEvent(Event in, SpOutputCollector out) throws SpRuntimeException { - Double leftValue = in.getFieldBySelector(leftOperand).getAsPrimitive().getAsDouble(); - Double rightValue = in.getFieldBySelector(rightOperand).getAsPrimitive().getAsDouble(); - - Double result = arithmeticOperation.operate(leftValue, rightValue); - in.addField(RESULT_FIELD, result); + public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException { + Double leftValue = event.getFieldBySelector(this.leftOperand).getAsPrimitive().getAsDouble(); + Double rightValue = event.getFieldBySelector(this.rightOperand).getAsPrimitive().getAsDouble(); + Double result = this.arithmeticOperation.operate(leftValue, rightValue); - out.collect(in); + event.addField(RESULT_FIELD, result); + spOutputCollector.collect(event); } @Override - public void onDetach() throws SpRuntimeException { - + public void onPipelineStopped() { } } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java index bdafa65682..085a7b3b8e 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/math/staticmathop/StaticMathOpProcessor.java @@ -19,11 +19,13 @@ package org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.processors.enricher.jvm.processor.math.operation.Operation; @@ -34,15 +36,14 @@ import org.apache.streampipes.processors.enricher.jvm.processor.math.operation.O import org.apache.streampipes.processors.enricher.jvm.processor.math.operation.OperationSubtracting; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; import org.apache.streampipes.sdk.helpers.OutputStrategies; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; -public class StaticMathOpProcessor extends StreamPipesDataProcessor { +public class StaticMathOpProcessor implements IStreamPipesDataProcessor { private static final String RESULT_FIELD = "calculationResultStatic"; private static final String LEFT_OPERAND = "leftOperand"; @@ -53,34 +54,35 @@ public class StaticMathOpProcessor extends StreamPipesDataProcessor { String leftOperand; double rightOperandValue; - @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop", 0) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .withLocales(Locales.EN) - .category(DataProcessorType.ALGORITHM) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(LEFT_OPERAND), - PropertyScope.NONE) - .build()) - .requiredFloatParameter(Labels.withId(RIGHT_OPERAND_VALUE)) - .outputStrategy( - OutputStrategies.keep()) - .requiredSingleValueSelection(Labels.withId(OPERATION), - Options.from("+", "-", "/", "*", "%")) - .build(); + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + StaticMathOpProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop", 0) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .category(DataProcessorType.ALGORITHM) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), + Labels.withId(LEFT_OPERAND), + PropertyScope.NONE) + .build()) + .requiredFloatParameter(Labels.withId(RIGHT_OPERAND_VALUE)) + .outputStrategy( + OutputStrategies.keep()) + .requiredSingleValueSelection(Labels.withId(OPERATION), + Options.from("+", "-", "/", "*", "%")) + .build() + ); } @Override - public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, - EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException { - this.leftOperand = parameters.extractor().mappingPropertyValue(LEFT_OPERAND); - this.rightOperandValue = parameters.extractor().singleValueParameter(RIGHT_OPERAND_VALUE, Double.class); - String operation = parameters.extractor().selectedSingleValue(OPERATION, String.class); + public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) { + this.leftOperand = params.extractor().mappingPropertyValue(LEFT_OPERAND); + this.rightOperandValue = params.extractor().singleValueParameter(RIGHT_OPERAND_VALUE, Double.class); + String operation = params.extractor().selectedSingleValue(OPERATION, String.class); switch (operation) { case "+": @@ -112,7 +114,6 @@ public class StaticMathOpProcessor extends StreamPipesDataProcessor { } @Override - public void onDetach() throws SpRuntimeException { - + public void onPipelineStopped() { } } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java index 47a955052b..2438d47dcf 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/sizemeasure/SizeMeasureProcessor.java @@ -19,14 +19,17 @@ package org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.EpProperties; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; @@ -34,14 +37,12 @@ import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.sdk.helpers.Tuple2; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; -public class SizeMeasureProcessor extends StreamPipesDataProcessor { +public class SizeMeasureProcessor implements IStreamPipesDataProcessor { static final String SIZE_UNIT = "sizeUnit"; static final String BYTE_SIZE = "BYTE"; @@ -57,40 +58,43 @@ public class SizeMeasureProcessor extends StreamPipesDataProcessor { private String sizeUnit; @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.sizemeasure", 0) - .category(DataProcessorType.STRUCTURE_ANALYTICS) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .withLocales(Locales.EN) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredProperty(EpRequirements.anyProperty()) - .build()) - .requiredSingleValueSelection( - Labels.withId(SIZE_UNIT), - Options.from( - new Tuple2<>(BYTES_OPTION, BYTE_SIZE), - new Tuple2<>(KILO_BYTES_OPTION, KILOBYTE_SIZE), - new Tuple2<>(MEGA_BYTES_OPTION, MEGABYTE_SIZE) + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + SizeMeasureProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.sizemeasure", 0) + .category(DataProcessorType.STRUCTURE_ANALYTICS) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredProperty(EpRequirements.anyProperty()) + .build()) + .requiredSingleValueSelection( + Labels.withId(SIZE_UNIT), + Options.from( + new Tuple2<>(BYTES_OPTION, BYTE_SIZE), + new Tuple2<>(KILO_BYTES_OPTION, KILOBYTE_SIZE), + new Tuple2<>(MEGA_BYTES_OPTION, MEGABYTE_SIZE) + ) ) - ) - .outputStrategy(OutputStrategies.append(EpProperties.doubleEp( - Labels.withId(EVENT_SIZE), - EVENT_SIZE, - "http://schema.org/contentSize" - ))) - .build(); + .outputStrategy(OutputStrategies.append(EpProperties.doubleEp( + Labels.withId(EVENT_SIZE), + EVENT_SIZE, + "http://schema.org/contentSize" + ))) + .build() + ); } @Override - public void onInvocation( - ProcessorParams parameters, - SpOutputCollector spOutputCollector, + public void onPipelineStarted( + IDataProcessorParameters params, + SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext - ) throws SpRuntimeException { - this.sizeUnit = parameters.extractor() - .selectedSingleValueInternalName(SIZE_UNIT, String.class); + ) { + this.sizeUnit = params.extractor() + .selectedSingleValueInternalName(SIZE_UNIT, String.class); } @Override @@ -105,13 +109,12 @@ public class SizeMeasureProcessor extends StreamPipesDataProcessor { event.addField(EVENT_SIZE, size); collector.collect(event); } catch (IOException e) { - e.printStackTrace(); + throw new SpRuntimeException("Error calculating event size", e); } } @Override - public void onDetach() throws SpRuntimeException { - + public void onPipelineStopped() { } private int getSizeInBytes(Object map) throws IOException { diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java index eb96233d70..8d9b536cb1 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/trigonometry/TrigonometryProcessor.java @@ -19,15 +19,18 @@ package org.apache.streampipes.processors.enricher.jvm.processor.trigonometry; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.EpProperties; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; @@ -35,10 +38,8 @@ import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.Options; import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.vocabulary.SO; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; -public class TrigonometryProcessor extends StreamPipesDataProcessor { +public class TrigonometryProcessor implements IStreamPipesDataProcessor { private static final String OPERAND = "operand"; private static final String OPERATION = "operation"; @@ -47,35 +48,34 @@ public class TrigonometryProcessor extends StreamPipesDataProcessor { private Operation operation; private String operand; - @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.processor.trigonometry", 0) - .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) - .withLocales(Locales.EN) - .category(DataProcessorType.ALGORITHM) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(OPERAND), - PropertyScope.NONE) - .build()) - .outputStrategy( - OutputStrategies.append( - EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) - .requiredSingleValueSelection(Labels.withId(OPERATION), - Options.from("sin", "cos", "tan")) - .build(); + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + TrigonometryProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.processor.trigonometry", 0) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) + .withLocales(Locales.EN) + .category(DataProcessorType.ALGORITHM) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), + Labels.withId(OPERAND), + PropertyScope.NONE) + .build()) + .outputStrategy( + OutputStrategies.append( + EpProperties.numberEp(Labels.empty(), RESULT_FIELD, SO.NUMBER))) + .requiredSingleValueSelection(Labels.withId(OPERATION), + Options.from("sin", "cos", "tan")) + .build() + ); } @Override - public void onInvocation(ProcessorParams parameters, - SpOutputCollector spOutputCollector, - EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException { - - this.operand = parameters.extractor().mappingPropertyValue(OPERAND); - String stringOperation = parameters.extractor().selectedSingleValue(OPERATION, String.class); + public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) { + this.operand = params.extractor().mappingPropertyValue(OPERAND); + String stringOperation = params.extractor().selectedSingleValue(OPERATION, String.class); switch (stringOperation) { case "sin": @@ -86,7 +86,6 @@ public class TrigonometryProcessor extends StreamPipesDataProcessor { break; case "tan": operation = Operation.TAN; - } } @@ -108,7 +107,6 @@ public class TrigonometryProcessor extends StreamPipesDataProcessor { } @Override - public void onDetach() throws SpRuntimeException { - + public void onPipelineStopped() { } } diff --git a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java index fac9005d72..a6ec429321 100644 --- a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java +++ b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valuechange/ValueChangeProcessor.java @@ -19,25 +19,26 @@ package org.apache.streampipes.processors.enricher.jvm.processor.valuechange; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor; +import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration; import org.apache.streampipes.sdk.helpers.EpProperties; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.OutputStrategies; import org.apache.streampipes.vocabulary.SO; -import org.apache.streampipes.wrapper.params.compat.ProcessorParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; -public class ValueChangeProcessor extends StreamPipesDataProcessor { +public class ValueChangeProcessor implements IStreamPipesDataProcessor { private static final String CHANGE_VALUE_MAPPING_ID = "change-value-mapping"; private static final String FROM_PROPERTY_VALUE_ID = "from-property-value"; private static final String TO_PROPERTY_VALUE_ID = "to-property-value"; @@ -50,32 +51,34 @@ public class ValueChangeProcessor extends StreamPipesDataProcessor { private float lastValueOfEvent; @Override - public DataProcessorDescription declareModel() { - return ProcessingElementBuilder - .create("org.apache.streampipes.processors.enricher.jvm.valuechange", 0) - .category(DataProcessorType.VALUE_OBSERVER) - .withAssets(ExtensionAssetType.DOCUMENTATION) - .withLocales(Locales.EN) - .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID)) - .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID)) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), - Labels.withId(CHANGE_VALUE_MAPPING_ID), - PropertyScope.NONE) - .build()) - .outputStrategy(OutputStrategies.append( - EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), IS_CHANGED, SO.BOOLEAN))) - .build(); + public IDataProcessorConfiguration declareConfig() { + return DataProcessorConfiguration.create( + ValueChangeProcessor::new, + ProcessingElementBuilder + .create("org.apache.streampipes.processors.enricher.jvm.valuechange", 0) + .category(DataProcessorType.VALUE_OBSERVER) + .withAssets(ExtensionAssetType.DOCUMENTATION) + .withLocales(Locales.EN) + .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID)) + .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID)) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(), + Labels.withId(CHANGE_VALUE_MAPPING_ID), + PropertyScope.NONE) + .build()) + .outputStrategy(OutputStrategies.append( + EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), IS_CHANGED, SO.BOOLEAN))) + .build() + ); } @Override - public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, - EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException { + public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) { this.lastValueOfEvent = Float.MAX_VALUE; - this.userDefinedFrom = processorParams.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class); - this.userDefinedTo = processorParams.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class); - this.mappingProperty = processorParams.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID); + this.userDefinedFrom = params.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class); + this.userDefinedTo = params.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class); + this.mappingProperty = params.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID); } @Override @@ -96,7 +99,6 @@ public class ValueChangeProcessor extends StreamPipesDataProcessor { } @Override - public void onDetach() throws SpRuntimeException { - + public void onPipelineStopped() { } }
