This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3653-update-processors-in-processors-image-processing-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3653-update-processors-in-processors-image-processing-jvm by this
push:
new c550ddc1bd refactor(#3653): Update image processing processors to
implement IStreamPipesDataProcessor interface
c550ddc1bd is described below
commit c550ddc1bdfb799a416e278cb09e99344cab564c
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jun 11 09:23:13 2025 +0200
refactor(#3653): Update image processing processors to implement
IStreamPipesDataProcessor interface
---
.../GenericImageClassificationProcessor.java | 62 +++++++++++-----------
.../imagecropper/ImageCropperProcessor.java | 45 ++++++++--------
.../imageenrichment/ImageEnrichmentProcessor.java | 45 ++++++++--------
.../processor/qrreader/QrCodeReaderProcessor.java | 53 +++++++++---------
4 files changed, 103 insertions(+), 102 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationProcessor.java
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationProcessor.java
index a3262ff440..6f8c55108d 100644
---
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationProcessor.java
+++
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/genericclassification/GenericImageClassificationProcessor.java
@@ -18,25 +18,25 @@
package
org.apache.streampipes.processors.imageprocessing.jvm.processor.genericclassification;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImagePropertyConstants;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.PlainImageTransformer;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import boofcv.abst.scene.ImageClassifier;
import boofcv.factory.scene.ClassifierAndSource;
@@ -52,45 +52,44 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
-public class GenericImageClassificationProcessor extends
StreamPipesDataProcessor {
+public class GenericImageClassificationProcessor implements
IStreamPipesDataProcessor {
private String imagePropertyName;
- private ClassifierAndSource cs;
-
private ImageClassifier<Planar<GrayF32>> classifier;
private List<String> categories;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processor.imageclassification.jvm.generic-image-classification",
0)
- .category(DataProcessorType.IMAGE_PROCESSING)
- .withAssets(ExtensionAssetType.DOCUMENTATION)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements
- .semanticTypeReq("https://image.com"), Labels.withId(
- ImagePropertyConstants.IMAGE_MAPPING.getProperty()),
PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.doubleEp(Labels.empty(), "score",
"https://schema.org/score"),
- EpProperties.stringEp(Labels.empty(), "category",
"https://schema.org/category")
-
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ GenericImageClassificationProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processor.imageclassification.jvm.generic-image-classification",
0)
+ .category(DataProcessorType.IMAGE_PROCESSING)
+ .withAssets(ExtensionAssetType.DOCUMENTATION)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements
+ .semanticTypeReq("https://image.com"), Labels.withId(
+ ImagePropertyConstants.IMAGE_MAPPING.getProperty()),
PropertyScope.NONE)
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.doubleEp(Labels.empty(), "score",
"https://schema.org/score"),
+ EpProperties.stringEp(Labels.empty(), "category",
"https://schema.org/category")
+
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- ProcessingElementParameterExtractor extractor = parameters.extractor();
+ public void onPipelineStarted(IDataProcessorParameters params,
SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
runtimeContext) {
+ var extractor = params.extractor();
imagePropertyName =
extractor.mappingPropertyValue(ImagePropertyConstants.IMAGE_MAPPING.getProperty());
- // this.cs = FactoryImageClassifier.vgg_cifar10(); // Test set 89.9% for
10 categories
- ClassifierAndSource cs = FactoryImageClassifier.nin_imagenet(); // Test
set 62.6% for 1000 categories
+ ClassifierAndSource cs = FactoryImageClassifier.nin_imagenet();
File path = DeepBoofDataBaseOps.downloadModel(cs.getSource(), new
File("download_data"));
@@ -128,7 +127,6 @@ public class GenericImageClassificationProcessor extends
StreamPipesDataProcesso
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperProcessor.java
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperProcessor.java
index c6cc15c69e..84cd927521 100644
---
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperProcessor.java
+++
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imagecropper/ImageCropperProcessor.java
@@ -19,24 +19,24 @@ package
org.apache.streampipes.processors.imageprocessing.jvm.processor.imagecro
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImagePropertyConstants;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImageTransformer;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.RequiredBoxStream;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment.BoxCoordinates;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.awt.image.BufferedImage;
import java.util.Base64;
@@ -44,30 +44,32 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-public class ImageCropperProcessor extends StreamPipesDataProcessor {
+public class ImageCropperProcessor implements IStreamPipesDataProcessor {
private String imageProperty;
private String boxArray;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processor.imageclassification.jvm.image-cropper",
0)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .category(DataProcessorType.IMAGE_PROCESSING)
- .requiredStream(RequiredBoxStream.getBoxStream())
-
.outputStrategy(OutputStrategies.append(EpProperties.integerEp(Labels.empty(),
- ImagePropertyConstants.CLASS_NAME.getProperty(),
"https://streampipes.org/classname"),
- EpProperties.doubleEp(Labels.empty(),
ImagePropertyConstants.SCORE.getProperty(),
- "https://streampipes.org/Label")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ImageCropperProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processor.imageclassification.jvm.image-cropper",
0)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .category(DataProcessorType.IMAGE_PROCESSING)
+ .requiredStream(RequiredBoxStream.getBoxStream())
+
.outputStrategy(OutputStrategies.append(EpProperties.integerEp(Labels.empty(),
+ ImagePropertyConstants.CLASS_NAME.getProperty(),
"https://streampipes.org/classname"),
+ EpProperties.doubleEp(Labels.empty(),
ImagePropertyConstants.SCORE.getProperty(),
+ "https://streampipes.org/Label")))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- ProcessingElementParameterExtractor extractor = parameters.extractor();
+ public void onPipelineStarted(IDataProcessorParameters params,
SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
runtimeContext) {
+ var extractor = params.extractor();
this.imageProperty =
extractor.mappingPropertyValue(RequiredBoxStream.IMAGE_PROPERTY);
this.boxArray =
extractor.mappingPropertyValue(RequiredBoxStream.BOX_ARRAY_PROPERTY);
}
@@ -111,7 +113,6 @@ public class ImageCropperProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentProcessor.java
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentProcessor.java
index 36407611ee..de62f6726e 100644
---
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentProcessor.java
+++
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/imageenrichment/ImageEnrichmentProcessor.java
@@ -18,23 +18,23 @@
package
org.apache.streampipes.processors.imageprocessing.jvm.processor.imageenrichment;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImagePropertyConstants;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.ImageTransformer;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.RequiredBoxStream;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.awt.BasicStroke;
import java.awt.Color;
@@ -48,7 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-public class ImageEnrichmentProcessor extends StreamPipesDataProcessor {
+public class ImageEnrichmentProcessor implements IStreamPipesDataProcessor {
public static final String ID =
"org.apache.streampipes.processor.imageclassification.jvm.image-enricher";
@@ -56,24 +56,26 @@ public class ImageEnrichmentProcessor extends
StreamPipesDataProcessor {
private String boxArray;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(ID, 1)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .category(DataProcessorType.IMAGE_PROCESSING)
- .requiredStream(RequiredBoxStream.getBoxStream())
- .outputStrategy(OutputStrategies.fixed(
- EpProperties.timestampProperty("timestamp"),
- EpProperties.stringEp(Labels.empty(),
ImagePropertyConstants.IMAGE.getProperty(),
- "https://image.com")
- ))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ImageEnrichmentProcessor::new,
+ ProcessingElementBuilder.create(ID, 1)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .category(DataProcessorType.IMAGE_PROCESSING)
+ .requiredStream(RequiredBoxStream.getBoxStream())
+ .outputStrategy(OutputStrategies.fixed(
+ EpProperties.timestampProperty("timestamp"),
+ EpProperties.stringEp(Labels.empty(),
ImagePropertyConstants.IMAGE.getProperty(),
+ "https://image.com")
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- ProcessingElementParameterExtractor extractor = parameters.extractor();
+ public void onPipelineStarted(IDataProcessorParameters params,
SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
runtimeContext) {
+ var extractor = params.extractor();
this.imageProperty =
extractor.mappingPropertyValue(RequiredBoxStream.IMAGE_PROPERTY);
this.boxArray =
extractor.mappingPropertyValue(RequiredBoxStream.BOX_ARRAY_PROPERTY);
}
@@ -131,7 +133,6 @@ public class ImageEnrichmentProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderProcessor.java
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderProcessor.java
index 182f98ae59..1d4e1c3af6 100644
---
a/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderProcessor.java
+++
b/streampipes-extensions/streampipes-processors-image-processing-jvm/src/main/java/org/apache/streampipes/processors/imageprocessing/jvm/processor/qrreader/QrCodeReaderProcessor.java
@@ -18,26 +18,26 @@
package
org.apache.streampipes.processors.imageprocessing.jvm.processor.qrreader;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.PlainImageTransformer;
import
org.apache.streampipes.processors.imageprocessing.jvm.processor.commons.RequiredBoxStream;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import boofcv.abst.fiducial.QrCodeDetector;
import boofcv.alg.fiducial.qrcode.QrCode;
@@ -51,7 +51,7 @@ import java.awt.image.BufferedImage;
import java.util.List;
import java.util.Optional;
-public class QrCodeReaderProcessor extends StreamPipesDataProcessor {
+public class QrCodeReaderProcessor implements IStreamPipesDataProcessor {
private static final String PLACEHOLDER_VALUE = "placeholder-value";
private static final String SEND_IF_NO_RESULT = "send-if-no-result";
@@ -62,28 +62,30 @@ public class QrCodeReaderProcessor extends
StreamPipesDataProcessor {
private Boolean sendIfNoResult;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
- .create("org.apache.streampipes.processor.imageclassification.qrcode",
0)
- .category(DataProcessorType.IMAGE_PROCESSING)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder.create()
-
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq("https://image.com"),
- Labels.withId(RequiredBoxStream.IMAGE_PROPERTY),
PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(SEND_IF_NO_RESULT),
Options.from("Yes", "No"))
- .requiredTextParameter(Labels.withId(PLACEHOLDER_VALUE))
- .outputStrategy(OutputStrategies.fixed(
- EpProperties.timestampProperty("timestamp"),
- EpProperties.stringEp(Labels.withId(QR_VALUE), "qrvalue",
"http://schema.org/text")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ QrCodeReaderProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processor.imageclassification.qrcode", 0)
+ .category(DataProcessorType.IMAGE_PROCESSING)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder.create()
+
.requiredPropertyWithUnaryMapping(EpRequirements.semanticTypeReq("https://image.com"),
+ Labels.withId(RequiredBoxStream.IMAGE_PROPERTY),
PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(SEND_IF_NO_RESULT),
Options.from("Yes", "No"))
+ .requiredTextParameter(Labels.withId(PLACEHOLDER_VALUE))
+ .outputStrategy(OutputStrategies.fixed(
+ EpProperties.timestampProperty("timestamp"),
+ EpProperties.stringEp(Labels.withId(QR_VALUE), "qrvalue",
"http://schema.org/text")))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- ProcessingElementParameterExtractor extractor = parameters.extractor();
+ public void onPipelineStarted(IDataProcessorParameters params,
SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
runtimeContext) {
+ var extractor = params.extractor();
imagePropertyName =
extractor.mappingPropertyValue(RequiredBoxStream.IMAGE_PROPERTY);
placeholderValue = extractor.singleValueParameter(PLACEHOLDER_VALUE,
String.class);
@@ -128,7 +130,6 @@ public class QrCodeReaderProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}