This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
3655-update-processors-in-processors-text-mining-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3655-update-processors-in-processors-text-mining-jvm by this push:
     new 34faba2bf9 refactor(#3655): Update text mining processors to implement 
IStreamPipesDataProcessor interface
34faba2bf9 is described below

commit 34faba2bf9ce743d86038f625c810615b8b6f2a7
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jun 11 09:45:00 2025 +0200

    refactor(#3655): Update text mining processors to implement 
IStreamPipesDataProcessor interface
---
 .../jvm/processor/chunker/ChunkerProcessor.java    | 117 ++++++++++++---------
 .../language/LanguageDetectionProcessor.java       |  91 +++++++++-------
 .../processor/namefinder/NameFinderProcessor.java  |  83 +++++++++------
 .../partofspeech/PartOfSpeechProcessor.java        |  90 +++++++++-------
 .../SentenceDetectionProcessor.java                |  70 +++++++-----
 .../processor/tokenizer/TokenizerProcessor.java    |  77 ++++++++------
 6 files changed, 308 insertions(+), 220 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
index 63c90ac447..00e37328be 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
@@ -19,25 +19,26 @@
 package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.ListField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import opennlp.tools.chunker.ChunkerME;
 import opennlp.tools.chunker.ChunkerModel;
@@ -49,7 +50,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.List;
 
-public class ChunkerProcessor extends StreamPipesDataProcessor {
+public class ChunkerProcessor implements IStreamPipesDataProcessor {
 
   private static final String TAGS_FIELD_KEY = "tagsField";
   private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -62,44 +63,59 @@ public class ChunkerProcessor extends 
StreamPipesDataProcessor {
   private ChunkerME chunker;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.textmining.jvm.chunker", 0)
-        .category(DataProcessorType.ENRICH_TEXT)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredFile(Labels.withId(BINARY_FILE_KEY))
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.listRequirement(Datatypes.String),
-                Labels.withId(TAGS_FIELD_KEY),
-                PropertyScope.NONE)
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.listRequirement(Datatypes.String),
-                Labels.withId(TOKENS_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.listStringEp(
-                Labels.withId(CHUNK_TYPE_FIELD_KEY),
-                CHUNK_TYPE_FIELD_KEY,
-                "http://schema.org/ItemList";),
-            EpProperties.listStringEp(
-                Labels.withId(CHUNK_FIELD_KEY),
-                CHUNK_FIELD_KEY,
-                "http://schema.org/ItemList";)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        ChunkerProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.textmining.jvm.chunker", 0)
+            .category(DataProcessorType.ENRICH_TEXT)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredFile(Labels.withId(BINARY_FILE_KEY))
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.listRequirement(Datatypes.String),
+                                    Labels.withId(TAGS_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.listRequirement(Datatypes.String),
+                                    Labels.withId(TOKENS_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.listStringEp(
+                    Labels.withId(CHUNK_TYPE_FIELD_KEY),
+                    CHUNK_TYPE_FIELD_KEY,
+                    "http://schema.org/ItemList";
+                ),
+                EpProperties.listStringEp(
+                    Labels.withId(CHUNK_FIELD_KEY),
+                    CHUNK_FIELD_KEY,
+                    "http://schema.org/ItemList";
+                )
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext context) throws 
SpRuntimeException {
-    this.tags = parameters.extractor().mappingPropertyValue(TAGS_FIELD_KEY);
-    this.tokens = 
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
-    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = 
context.getStreamPipesClient().fileApi().getFileContent(filename);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext context
+  ) {
+    this.tags = params.extractor()
+                      .mappingPropertyValue(TAGS_FIELD_KEY);
+    this.tokens = params.extractor()
+                        .mappingPropertyValue(TOKENS_FIELD_KEY);
+    String filename = params.extractor()
+                            .selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = context.getStreamPipesClient()
+                                .fileApi()
+                                .getFileContent(filename);
 
     InputStream modelIn = new ByteArrayInputStream(fileContent);
     ChunkerModel model;
@@ -113,19 +129,23 @@ public class ChunkerProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onEvent(Event event,
-                      SpOutputCollector collector) throws SpRuntimeException {
-    ListField tags = event.getFieldBySelector(this.tags).getAsList();
-    ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    ListField tags = event.getFieldBySelector(this.tags)
+                          .getAsList();
+    ListField tokens = event.getFieldBySelector(this.tokens)
+                            .getAsList();
 
-
-    String[] tagsArray = tags.castItems(String.class).toArray(String[]::new);
-    String[] tokensArray = 
tokens.castItems(String.class).toArray(String[]::new);
+    String[] tagsArray = tags.castItems(String.class)
+                             .toArray(String[]::new);
+    String[] tokensArray = tokens.castItems(String.class)
+                                 .toArray(String[]::new);
 
     Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
 
     List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
-    String[] types = 
Arrays.stream(spans).map(Span::getType).toArray(String[]::new);
+    String[] types = Arrays.stream(spans)
+                           .map(Span::getType)
+                           .toArray(String[]::new);
 
     event.addField(ChunkerProcessor.CHUNK_TYPE_FIELD_KEY, types);
     event.addField(ChunkerProcessor.CHUNK_FIELD_KEY, chunks);
@@ -134,7 +154,6 @@ public class ChunkerProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
index 0f4dca2d76..9287e2edb7 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
@@ -19,22 +19,23 @@
 package org.apache.streampipes.processors.textmining.jvm.processor.language;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import opennlp.tools.langdetect.Language;
 import opennlp.tools.langdetect.LanguageDetector;
@@ -45,7 +46,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-public class LanguageDetectionProcessor extends StreamPipesDataProcessor {
+public class LanguageDetectionProcessor implements IStreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   static final String LANGUAGE_KEY = "language";
@@ -56,42 +57,55 @@ public class LanguageDetectionProcessor extends 
StreamPipesDataProcessor {
   private LanguageDetector languageDetector;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.textmining.jvm.languagedetection", 0)
-        .category(DataProcessorType.ENRICH_TEXT)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredFile(Labels.withId(BINARY_FILE_KEY))
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.stringReq(),
-                Labels.withId(DETECTION_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.stringEp(
-                Labels.withId(LANGUAGE_KEY),
-                LANGUAGE_KEY,
-                "http://schema.org/language";),
-            EpProperties.doubleEp(
-                Labels.withId(CONFIDENCE_KEY),
-                CONFIDENCE_KEY,
-                "https://schema.org/Float";)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        LanguageDetectionProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.textmining.jvm.languagedetection", 0)
+            .category(DataProcessorType.ENRICH_TEXT)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredFile(Labels.withId(BINARY_FILE_KEY))
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.stringReq(),
+                                    Labels.withId(DETECTION_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.stringEp(
+                    Labels.withId(LANGUAGE_KEY),
+                    LANGUAGE_KEY,
+                    "http://schema.org/language";
+                ),
+                EpProperties.doubleEp(
+                    Labels.withId(CONFIDENCE_KEY),
+                    CONFIDENCE_KEY,
+                    "https://schema.org/Float";
+                )
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext context) throws 
SpRuntimeException {
-    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = 
context.getStreamPipesClient().fileApi().getFileContent(filename);
-    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext context
+  ) {
+    String filename = params.extractor()
+                            .selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = context.getStreamPipesClient()
+                                .fileApi()
+                                .getFileContent(filename);
+    this.detection = params.extractor()
+                           .mappingPropertyValue(DETECTION_FIELD_KEY);
 
     InputStream modelIn = new ByteArrayInputStream(fileContent);
-    LanguageDetectorModel model = null;
+    LanguageDetectorModel model;
     try {
       model = new LanguageDetectorModel(modelIn);
     } catch (IOException e) {
@@ -103,7 +117,9 @@ public class LanguageDetectionProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String text = 
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+    String text = event.getFieldBySelector(detection)
+                       .getAsPrimitive()
+                       .getAsString();
     Language language = languageDetector.predictLanguage(text);
 
     event.addField(LanguageDetectionProcessor.LANGUAGE_KEY, 
language.getLang());
@@ -113,7 +129,6 @@ public class LanguageDetectionProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
index 8120a8467f..44f405add6 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
@@ -19,25 +19,26 @@
 package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.ListField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import opennlp.tools.namefind.NameFinderME;
 import opennlp.tools.namefind.TokenNameFinderModel;
@@ -48,7 +49,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
-public class NameFinderProcessor extends StreamPipesDataProcessor {
+public class NameFinderProcessor implements IStreamPipesDataProcessor {
 
   private static final String MODEL = "model";
   private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -58,43 +59,56 @@ public class NameFinderProcessor extends 
StreamPipesDataProcessor {
   private NameFinderME nameFinder;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.textmining.jvm.namefinder", 
0)
-        .category(DataProcessorType.ENRICH_TEXT)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.listRequirement(Datatypes.String),
-                Labels.withId(TOKENS_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        .requiredFile(Labels.withId(MODEL))
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.listStringEp(
-                Labels.withId(FOUND_NAME_FIELD_KEY),
-                FOUND_NAME_FIELD_KEY,
-                "http://schema.org/ItemList";)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        NameFinderProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.textmining.jvm.namefinder", 0)
+            .category(DataProcessorType.ENRICH_TEXT)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.listRequirement(Datatypes.String),
+                                    Labels.withId(TOKENS_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .requiredFile(Labels.withId(MODEL))
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.listStringEp(
+                    Labels.withId(FOUND_NAME_FIELD_KEY),
+                    FOUND_NAME_FIELD_KEY,
+                    "http://schema.org/ItemList";
+                )))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    String filename = parameters.extractor().selectedFilename(MODEL);
-    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
-    this.tokens = 
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    String filename = params.extractor()
+                            .selectedFilename(MODEL);
+    byte[] fileContent = runtimeContext.getStreamPipesClient()
+                                       .fileApi()
+                                       .getFileContent(filename);
+    this.tokens = params.extractor()
+                        .mappingPropertyValue(TOKENS_FIELD_KEY);
     loadModel(fileContent);
   }
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
+    ListField tokens = event.getFieldBySelector(this.tokens)
+                            .getAsList();
 
-    String[] tokensArray = 
tokens.castItems(String.class).toArray(String[]::new);
+    String[] tokensArray = tokens.castItems(String.class)
+                                 .toArray(String[]::new);
     Span[] spans = nameFinder.find(tokensArray);
 
     // Generating the list of names from the found spans by the nameFinder
@@ -107,8 +121,7 @@ public class NameFinderProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 
   private void loadModel(byte[] modelContent) {
@@ -116,7 +129,7 @@ public class NameFinderProcessor extends 
StreamPipesDataProcessor {
       TokenNameFinderModel model = new TokenNameFinderModel(modelIn);
       nameFinder = new NameFinderME(model);
     } catch (IOException e) {
-      e.printStackTrace();
+      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
     }
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
index 4f2eddab07..12c3513647 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
@@ -19,24 +19,25 @@
 package 
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.ListField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import opennlp.tools.postag.POSModel;
 import opennlp.tools.postag.POSTaggerME;
@@ -45,7 +46,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-public class PartOfSpeechProcessor extends StreamPipesDataProcessor {
+public class PartOfSpeechProcessor implements IStreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   static final String CONFIDENCE_KEY = "confidencePos";
@@ -56,39 +57,52 @@ public class PartOfSpeechProcessor extends 
StreamPipesDataProcessor {
   private POSTaggerME posTagger;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.textmining.jvm.partofspeech", 0)
-        .category(DataProcessorType.ENRICH_TEXT)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredFile(Labels.withId(BINARY_FILE_KEY))
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.listRequirement(Datatypes.String),
-                Labels.withId(DETECTION_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.listDoubleEp(
-                Labels.withId(CONFIDENCE_KEY),
-                CONFIDENCE_KEY,
-                "http://schema.org/ItemList";),
-            EpProperties.listStringEp(
-                Labels.withId(TAG_KEY),
-                TAG_KEY,
-                "http://schema.org/ItemList";)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        PartOfSpeechProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.textmining.jvm.partofspeech", 0)
+            .category(DataProcessorType.ENRICH_TEXT)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredFile(Labels.withId(BINARY_FILE_KEY))
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    
EpRequirements.listRequirement(Datatypes.String),
+                                    Labels.withId(DETECTION_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(
+                EpProperties.listDoubleEp(
+                    Labels.withId(CONFIDENCE_KEY),
+                    CONFIDENCE_KEY,
+                    "http://schema.org/ItemList";
+                ),
+                EpProperties.listStringEp(
+                    Labels.withId(TAG_KEY),
+                    TAG_KEY,
+                    "http://schema.org/ItemList";
+                )
+            ))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
-    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    String filename = params.extractor()
+                            .selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = runtimeContext.getStreamPipesClient()
+                                       .fileApi()
+                                       .getFileContent(filename);
+    this.detection = params.extractor()
+                           .mappingPropertyValue(DETECTION_FIELD_KEY);
 
     InputStream modelIn = new ByteArrayInputStream(fileContent);
     POSModel model;
@@ -103,9 +117,11 @@ public class PartOfSpeechProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    ListField text = event.getFieldBySelector(detection).getAsList();
+    ListField text = event.getFieldBySelector(detection)
+                          .getAsList();
 
-    String[] tags = 
posTagger.tag(text.castItems(String.class).toArray(String[]::new));
+    String[] tags = posTagger.tag(text.castItems(String.class)
+                                      .toArray(String[]::new));
     double[] confidence = posTagger.probs();
 
 
@@ -116,6 +132,6 @@ public class PartOfSpeechProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
index 83136155d3..0a5a470426 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
@@ -19,21 +19,22 @@
 package 
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import opennlp.tools.sentdetect.SentenceDetectorME;
 import opennlp.tools.sentdetect.SentenceModel;
@@ -42,7 +43,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-public class SentenceDetectionProcessor extends StreamPipesDataProcessor {
+public class SentenceDetectionProcessor implements IStreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   private static final String BINARY_FILE_KEY = "binary-file";
@@ -51,31 +52,41 @@ public class SentenceDetectionProcessor extends 
StreamPipesDataProcessor {
   private SentenceDetectorME sentenceDetector;
 
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        
.create("org.apache.streampipes.processors.textmining.jvm.sentencedetection", 0)
-        .category(DataProcessorType.ENRICH_TEXT)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredFile(Labels.withId(BINARY_FILE_KEY))
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.stringReq(),
-                Labels.withId(DETECTION_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.keep())
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        SentenceDetectionProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.textmining.jvm.sentencedetection", 0)
+            .category(DataProcessorType.ENRICH_TEXT)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredFile(Labels.withId(BINARY_FILE_KEY))
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.stringReq(),
+                                    Labels.withId(DETECTION_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.keep())
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
-    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    String filename = params.extractor()
+                            .selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = runtimeContext.getStreamPipesClient()
+                                       .fileApi()
+                                       .getFileContent(filename);
+    this.detection = params.extractor()
+                           .mappingPropertyValue(DETECTION_FIELD_KEY);
 
     InputStream modelIn = new ByteArrayInputStream(fileContent);
     SentenceModel model;
@@ -90,7 +101,9 @@ public class SentenceDetectionProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String text = 
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+    String text = event.getFieldBySelector(detection)
+                       .getAsPrimitive()
+                       .getAsString();
 
     String sentences[] = sentenceDetector.sentDetect(text);
 
@@ -101,7 +114,6 @@ public class SentenceDetectionProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
index a40cda4643..fab28b112f 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
@@ -19,22 +19,23 @@
 package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import opennlp.tools.tokenize.TokenizerME;
 import opennlp.tools.tokenize.TokenizerModel;
@@ -43,7 +44,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
-public class TokenizerProcessor extends StreamPipesDataProcessor {
+public class TokenizerProcessor implements IStreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   static final String TOKEN_LIST_FIELD_KEY = "tokenList";
@@ -52,35 +53,46 @@ public class TokenizerProcessor extends 
StreamPipesDataProcessor {
   private String detection;
   private TokenizerME tokenizer;
 
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder
-        .create("org.apache.streampipes.processors.textmining.jvm.tokenizer", 
0)
-        .category(DataProcessorType.ENRICH_TEXT)
-        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
-        .withLocales(Locales.EN)
-        .requiredFile(Labels.withId(BINARY_FILE_KEY))
-        .requiredStream(StreamRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.stringReq(),
-                Labels.withId(DETECTION_FIELD_KEY),
-                PropertyScope.NONE)
-            .build())
-        
.outputStrategy(OutputStrategies.append(EpProperties.listStringEp(Labels.withId(TOKEN_LIST_FIELD_KEY),
-            TOKEN_LIST_FIELD_KEY,
-            "http://schema.org/ItemList";)))
-        .build();
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        TokenizerProcessor::new,
+        ProcessingElementBuilder
+            
.create("org.apache.streampipes.processors.textmining.jvm.tokenizer", 0)
+            .category(DataProcessorType.ENRICH_TEXT)
+            .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
+            .withLocales(Locales.EN)
+            .requiredFile(Labels.withId(BINARY_FILE_KEY))
+            .requiredStream(StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                    EpRequirements.stringReq(),
+                                    Labels.withId(DETECTION_FIELD_KEY),
+                                    PropertyScope.NONE
+                                )
+                                .build())
+            .outputStrategy(OutputStrategies.append(EpProperties.listStringEp(
+                Labels.withId(TOKEN_LIST_FIELD_KEY),
+                TOKEN_LIST_FIELD_KEY,
+                "http://schema.org/ItemList";
+            )))
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams parameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
-    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+  public void onPipelineStarted(
+      IDataProcessorParameters params,
+      SpOutputCollector collector,
+      EventProcessorRuntimeContext runtimeContext
+  ) {
+    String filename = params.extractor()
+                            .selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = runtimeContext.getStreamPipesClient()
+                                       .fileApi()
+                                       .getFileContent(filename);
+    this.detection = params.extractor()
+                           .mappingPropertyValue(DETECTION_FIELD_KEY);
 
     InputStream modelIn = new ByteArrayInputStream(fileContent);
     TokenizerModel model;
@@ -95,7 +107,9 @@ public class TokenizerProcessor extends 
StreamPipesDataProcessor {
 
   @Override
   public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
-    String text = 
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+    String text = event.getFieldBySelector(detection)
+                       .getAsPrimitive()
+                       .getAsString();
 
     event.addField(TokenizerProcessor.TOKEN_LIST_FIELD_KEY, 
tokenizer.tokenize(text));
 
@@ -103,7 +117,6 @@ public class TokenizerProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() throws SpRuntimeException {
-
+  public void onPipelineStopped() {
   }
 }


Reply via email to