This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 1601-migrate-text-mining-processors
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/1601-migrate-text-mining-processors by this push:
     new 4918a3de8 Migrate text mining processors (#1601)
4918a3de8 is described below

commit 4918a3de8d0d3864ddab4b933fce4a410c5365d3
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri May 26 14:20:22 2023 +0200

    Migrate text mining processors (#1601)
---
 .../textmining/jvm/TextMiningJvmInit.java          | 24 +++---
 .../textmining/jvm/processor/chunker/Chunker.java  | 99 ----------------------
 .../jvm/processor/chunker/ChunkerParameters.java   | 47 ----------
 ...hunkerController.java => ChunkerProcessor.java} | 79 +++++++++++++----
 .../jvm/processor/language/LanguageDetection.java  | 79 -----------------
 .../language/LanguageDetectionParameters.java      | 41 ---------
 ...roller.java => LanguageDetectionProcessor.java} | 65 ++++++++++----
 .../jvm/processor/namefinder/NameFinder.java       | 85 -------------------
 .../processor/namefinder/NameFinderParameters.java | 42 ---------
 ...derController.java => NameFinderProcessor.java} | 85 ++++++++++++-------
 .../jvm/processor/partofspeech/PartOfSpeech.java   | 81 ------------------
 .../partofspeech/PartOfSpeechParameters.java       | 42 ---------
 ...hController.java => PartOfSpeechProcessor.java} | 65 ++++++++++----
 .../sentencedetection/SentenceDetection.java       | 79 -----------------
 .../SentenceDetectionParameters.java               | 41 ---------
 ...roller.java => SentenceDetectionProcessor.java} | 64 ++++++++++----
 .../jvm/processor/tokenizer/Tokenizer.java         | 76 -----------------
 .../processor/tokenizer/TokenizerParameters.java   | 41 ---------
 ...izerController.java => TokenizerProcessor.java} | 61 +++++++++----
 19 files changed, 323 insertions(+), 873 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
index 1af275081..b4ead4b46 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
@@ -27,12 +27,12 @@ import 
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBui
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.chunker.ChunkerController;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.language.LanguageDetectionController;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.namefinder.NameFinderController;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech.PartOfSpeechController;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection.SentenceDetectionController;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.tokenizer.TokenizerController;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.chunker.ChunkerProcessor;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.language.LanguageDetectionProcessor;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.namefinder.NameFinderProcessor;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech.PartOfSpeechProcessor;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection.SentenceDetectionProcessor;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.tokenizer.TokenizerProcessor;
 import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
 
 public class TextMiningJvmInit extends ExtensionsModelSubmitter {
@@ -47,12 +47,12 @@ public class TextMiningJvmInit extends 
ExtensionsModelSubmitter {
     return 
SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.textmining.jvm",
             "Processors Text Mining JVM", "",
             8090)
-        .registerPipelineElements(new LanguageDetectionController(),
-            new TokenizerController(),
-            new PartOfSpeechController(),
-            new ChunkerController(),
-            new NameFinderController(),
-            new SentenceDetectionController())
+        .registerPipelineElements(new LanguageDetectionProcessor(),
+            new TokenizerProcessor(),
+            new PartOfSpeechProcessor(),
+            new ChunkerProcessor(),
+            new NameFinderProcessor(),
+            new SentenceDetectionProcessor())
         .registerMessagingFormats(
             new JsonDataFormatFactory(),
             new CborDataFormatFactory(),
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/Chunker.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/Chunker.java
deleted file mode 100644
index ab6c1392c..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/Chunker.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.ListField;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.chunker.ChunkerME;
-import opennlp.tools.chunker.ChunkerModel;
-import opennlp.tools.util.Span;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.List;
-
-public class Chunker implements EventProcessor<ChunkerParameters> {
-
-  private static Logger log;
-
-  private String tags;
-  private String tokens;
-  private ChunkerME chunker;
-
-  public Chunker() {
-//    try (InputStream modelIn = 
getClass().getClassLoader().getResourceAsStream("chunker-en.bin")) {
-//      ChunkerModel model = new ChunkerModel(modelIn);
-//      chunker = new ChunkerME(model);
-//    } catch (IOException e) {
-//      e.printStackTrace();
-//    }
-  }
-
-  @Override
-  public void onInvocation(ChunkerParameters chunkerParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    log = chunkerParameters.getGraph().getLogger(Chunker.class);
-    this.tags = chunkerParameters.getTags();
-    this.tokens = chunkerParameters.getTokens();
-
-    InputStream modelIn = new 
ByteArrayInputStream(chunkerParameters.getFileContent());
-    ChunkerModel model = null;
-    try {
-      model = new ChunkerModel(modelIn);
-    } catch (IOException e) {
-      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
-    }
-
-    chunker = new ChunkerME(model);
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) throws 
SpRuntimeException {
-    ListField tags = inputEvent.getFieldBySelector(this.tags).getAsList();
-    ListField tokens = inputEvent.getFieldBySelector(this.tokens).getAsList();
-
-
-    String[] tagsArray = 
tags.castItems(String.class).stream().toArray(String[]::new);
-    String[] tokensArray = 
tokens.castItems(String.class).stream().toArray(String[]::new);
-
-    Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
-
-    List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
-    String[] types = Arrays.stream(spans).map(s -> 
s.getType()).toArray(String[]::new);
-
-    inputEvent.addField(ChunkerController.CHUNK_TYPE_FIELD_KEY, types);
-    inputEvent.addField(ChunkerController.CHUNK_FIELD_KEY, chunks);
-
-    out.collect(inputEvent);
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerParameters.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerParameters.java
deleted file mode 100644
index b94361e46..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerParameters.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class ChunkerParameters extends EventProcessorBindingParams {
-  private String tags;
-  private String tokens;
-  private byte[] fileContent;
-
-  public ChunkerParameters(DataProcessorInvocation graph, String tags, String 
tokens, byte[] fileContent) {
-    super(graph);
-    this.tags = tags;
-    this.tokens = tokens;
-    this.fileContent = fileContent;
-  }
-
-  public String getTags() {
-    return tags;
-  }
-
-  public String getTokens() {
-    return tokens;
-  }
-
-  public byte[] getFileContent() {
-    return fileContent;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerController.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
similarity index 54%
rename from 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerController.java
rename to 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
index 49052335c..9b1ef12aa 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerController.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
@@ -18,15 +18,15 @@
 
 package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
 
-import org.apache.streampipes.client.StreamPipesClient;
-import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.ListField;
 import org.apache.streampipes.model.schema.PropertyScope;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,10 +34,22 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class ChunkerController extends 
StandaloneEventProcessingDeclarer<ChunkerParameters> {
+import opennlp.tools.chunker.ChunkerME;
+import opennlp.tools.chunker.ChunkerModel;
+import opennlp.tools.util.Span;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class ChunkerProcessor extends StreamPipesDataProcessor {
 
   private static final String TAGS_FIELD_KEY = "tagsField";
   private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -45,6 +57,10 @@ public class ChunkerController extends 
StandaloneEventProcessingDeclarer<Chunker
   static final String CHUNK_FIELD_KEY = "chunk";
   private static final String BINARY_FILE_KEY = "binary-file";
 
+  private String tags;
+  private String tokens;
+  private ChunkerME chunker;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.chunker")
@@ -76,17 +92,48 @@ public class ChunkerController extends 
StandaloneEventProcessingDeclarer<Chunker
   }
 
   @Override
-  public ConfiguredEventProcessor<ChunkerParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                  
ProcessingElementParameterExtractor extractor) {
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext context) throws 
SpRuntimeException {
+    this.tags = parameters.extractor().mappingPropertyValue(TAGS_FIELD_KEY);
+    this.tokens = 
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
+    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = 
context.getStreamPipesClient().fileApi().getFileContent(filename);
+
+    InputStream modelIn = new ByteArrayInputStream(fileContent);
+    ChunkerModel model;
+    try {
+      model = new ChunkerModel(modelIn);
+    } catch (IOException e) {
+      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
+    }
+
+    chunker = new ChunkerME(model);
+  }
+
+  @Override
+  public void onEvent(Event event,
+                      SpOutputCollector collector) throws SpRuntimeException {
+    ListField tags = event.getFieldBySelector(this.tags).getAsList();
+    ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
 
-    StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
-    String filename = extractor.selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = client.fileApi().getFileContent(filename);
 
-    String tags = extractor.mappingPropertyValue(TAGS_FIELD_KEY);
-    String tokens = extractor.mappingPropertyValue(TOKENS_FIELD_KEY);
+    String[] tagsArray = tags.castItems(String.class).toArray(String[]::new);
+    String[] tokensArray = 
tokens.castItems(String.class).toArray(String[]::new);
+
+    Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
+
+    List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
+    String[] types = 
Arrays.stream(spans).map(Span::getType).toArray(String[]::new);
+
+    event.addField(ChunkerProcessor.CHUNK_TYPE_FIELD_KEY, types);
+    event.addField(ChunkerProcessor.CHUNK_FIELD_KEY, chunks);
+
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
 
-    ChunkerParameters params = new ChunkerParameters(graph, tags, tokens, 
fileContent);
-    return new ConfiguredEventProcessor<>(params, Chunker::new);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetection.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetection.java
deleted file mode 100644
index aa200f435..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetection.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.language;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.langdetect.Language;
-import opennlp.tools.langdetect.LanguageDetector;
-import opennlp.tools.langdetect.LanguageDetectorME;
-import opennlp.tools.langdetect.LanguageDetectorModel;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class LanguageDetection implements 
EventProcessor<LanguageDetectionParameters> {
-
-  private static Logger log;
-
-  private String detection;
-  private LanguageDetector languageDetector;
-
-  public LanguageDetection() {
-  }
-
-  @Override
-  public void onInvocation(LanguageDetectionParameters 
languageDetectionParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    log = 
languageDetectionParameters.getGraph().getLogger(LanguageDetection.class);
-    this.detection = languageDetectionParameters.getDetectionName();
-
-    InputStream modelIn = new 
ByteArrayInputStream(languageDetectionParameters.getFileContent());
-    LanguageDetectorModel model = null;
-    try {
-      model = new LanguageDetectorModel(modelIn);
-    } catch (IOException e) {
-      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
-    }
-
-    languageDetector = new LanguageDetectorME(model);
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-    String text = 
inputEvent.getFieldBySelector(detection).getAsPrimitive().getAsString();
-    Language language = languageDetector.predictLanguage(text);
-
-    inputEvent.addField(LanguageDetectionController.LANGUAGE_KEY, 
language.getLang());
-    inputEvent.addField(LanguageDetectionController.CONFIDENCE_KEY, 
language.getConfidence());
-
-    out.collect(inputEvent);
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionParameters.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionParameters.java
deleted file mode 100644
index 0f33bda39..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.language;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class LanguageDetectionParameters extends EventProcessorBindingParams {
-  private byte[] fileContent;
-  private String detectionName;
-
-  public LanguageDetectionParameters(DataProcessorInvocation graph, String 
fieldName, byte[] fileContent) {
-    super(graph);
-    this.detectionName = fieldName;
-    this.fileContent = fileContent;
-  }
-
-  public String getDetectionName() {
-    return detectionName;
-  }
-
-  public byte[] getFileContent() {
-    return fileContent;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionController.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
similarity index 56%
rename from 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionController.java
rename to 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
index 0e1e0a380..e970112f4 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionController.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
@@ -18,31 +18,43 @@
 
 package org.apache.streampipes.processors.textmining.jvm.processor.language;
 
-import org.apache.streampipes.client.StreamPipesClient;
-import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class LanguageDetectionController extends 
StandaloneEventProcessingDeclarer<LanguageDetectionParameters> {
+import opennlp.tools.langdetect.Language;
+import opennlp.tools.langdetect.LanguageDetector;
+import opennlp.tools.langdetect.LanguageDetectorME;
+import opennlp.tools.langdetect.LanguageDetectorModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LanguageDetectionProcessor extends StreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   static final String LANGUAGE_KEY = "language";
   static final String CONFIDENCE_KEY = "confidenceLanguage";
   private static final String BINARY_FILE_KEY = "binary-file";
 
+  private String detection;
+  private LanguageDetector languageDetector;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.languagedetection")
@@ -70,16 +82,37 @@ public class LanguageDetectionController extends 
StandaloneEventProcessingDeclar
   }
 
   @Override
-  public ConfiguredEventProcessor<LanguageDetectionParameters> onInvocation(
-      DataProcessorInvocation graph,
-      ProcessingElementParameterExtractor extractor) {
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext context) throws 
SpRuntimeException {
+    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = 
context.getStreamPipesClient().fileApi().getFileContent(filename);
+    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+    InputStream modelIn = new ByteArrayInputStream(fileContent);
+    LanguageDetectorModel model = null;
+    try {
+      model = new LanguageDetectorModel(modelIn);
+    } catch (IOException e) {
+      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
+    }
+
+    languageDetector = new LanguageDetectorME(model);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    String text = 
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+    Language language = languageDetector.predictLanguage(text);
+
+    event.addField(LanguageDetectionProcessor.LANGUAGE_KEY, 
language.getLang());
+    event.addField(LanguageDetectionProcessor.CONFIDENCE_KEY, 
language.getConfidence());
 
-    StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
-    String filename = extractor.selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = client.fileApi().getFileContent(filename);
-    String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
 
-    LanguageDetectionParameters params = new 
LanguageDetectionParameters(graph, detection, fileContent);
-    return new ConfiguredEventProcessor<>(params, LanguageDetection::new);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinder.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinder.java
deleted file mode 100644
index 9455c30be..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.ListField;
-import 
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.namefind.NameFinderME;
-import opennlp.tools.namefind.TokenNameFinderModel;
-import opennlp.tools.util.Span;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-public class NameFinder implements EventProcessor<NameFinderParameters> {
-
-  private String tokens;
-  private NameFinderME nameFinder;
-
-  public NameFinder() {
-
-  }
-
-  @Override
-  public void onInvocation(NameFinderParameters nameFinderParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-
-    loadModel(nameFinderParameters.getModel());
-
-    this.tokens = nameFinderParameters.getTokens();
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) throws 
SpRuntimeException {
-    ListField tokens = inputEvent.getFieldBySelector(this.tokens).getAsList();
-
-    String[] tokensArray = 
tokens.castItems(String.class).stream().toArray(String[]::new);
-    Span[] spans = nameFinder.find(tokensArray);
-
-    // Generating the list of names from the found spans by the nameFinder
-    List<String> names = TextMiningUtil.extractSpans(spans, tokensArray);
-
-    nameFinder.clearAdaptiveData();
-
-    inputEvent.addField(NameFinderController.FOUND_NAME_FIELD_KEY, names);
-    out.collect(inputEvent);
-  }
-
-  @Override
-  public void onDetach() {
-  }
-
-  private void loadModel(byte[] modelContent) {
-    try (InputStream modelIn = new ByteArrayInputStream(modelContent)) {
-      TokenNameFinderModel model = new TokenNameFinderModel(modelIn);
-      nameFinder = new NameFinderME(model);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderParameters.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderParameters.java
deleted file mode 100644
index 627f2a213..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderParameters.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class NameFinderParameters extends EventProcessorBindingParams {
-  private String tokens;
-  private byte[] model;
-
-  public NameFinderParameters(DataProcessorInvocation graph, String tokens, 
byte[] model) {
-    super(graph);
-    this.tokens = tokens;
-    this.model = model;
-  }
-
-  public String getTokens() {
-    return tokens;
-  }
-
-  public byte[] getModel() {
-    return model;
-  }
-
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderController.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
similarity index 52%
rename from 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderController.java
rename to 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
index 9d1d19761..30d9ef1a0 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderController.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
@@ -18,15 +18,15 @@
 
 package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
 
-import org.apache.streampipes.client.StreamPipesClient;
-import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.ListField;
 import org.apache.streampipes.model.schema.PropertyScope;
+import 
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,15 +34,29 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class NameFinderController extends 
StandaloneEventProcessingDeclarer<NameFinderParameters> {
+import opennlp.tools.namefind.NameFinderME;
+import opennlp.tools.namefind.TokenNameFinderModel;
+import opennlp.tools.util.Span;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class NameFinderProcessor extends StreamPipesDataProcessor {
 
   private static final String MODEL = "model";
   private static final String TOKENS_FIELD_KEY = "tokensField";
   static final String FOUND_NAME_FIELD_KEY = "foundNames";
 
+  private String tokens;
+  private NameFinderME nameFinder;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.namefinder")
@@ -66,31 +80,42 @@ public class NameFinderController extends 
StandaloneEventProcessingDeclarer<Name
   }
 
   @Override
-  public ConfiguredEventProcessor<NameFinderParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                     
ProcessingElementParameterExtractor extractor) {
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    String filename = parameters.extractor().selectedFilename(MODEL);
+    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+    this.tokens = 
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
+    loadModel(fileContent);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
+
+    String[] tokensArray = 
tokens.castItems(String.class).toArray(String[]::new);
+    Span[] spans = nameFinder.find(tokensArray);
 
-    StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
-    String filename = extractor.selectedFilename(MODEL);
-    byte[] fileContent = client.fileApi().getFileContent(filename);
-    String tokens = extractor.mappingPropertyValue(TOKENS_FIELD_KEY);
+    // Generating the list of names from the found spans by the nameFinder
+    List<String> names = TextMiningUtil.extractSpans(spans, tokensArray);
+
+    nameFinder.clearAdaptiveData();
+
+    event.addField(NameFinderProcessor.FOUND_NAME_FIELD_KEY, names);
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
 
-    NameFinderParameters params = new NameFinderParameters(graph, tokens, 
fileContent);
-    return new ConfiguredEventProcessor<>(params, NameFinder::new);
   }
 
-//  @Override
-//  public List<Option> resolveOptions(String requestId, 
StaticPropertyExtractor parameterExtractor) {
-//    String directoryPath = TextMiningJvmConfig.INSTANCE.getModelDirectory();
-//
-//    List<Option> result = new ArrayList<>();
-//
-//    File folder = new File(directoryPath);
-//    File[] listOfFiles = folder.listFiles();
-//
-//    for (File file : listOfFiles) {
-//      result.add(new Option(file.getName()));
-//    }
-//
-//    return result;
-//  }
+  private void loadModel(byte[] modelContent) {
+    try (InputStream modelIn = new ByteArrayInputStream(modelContent)) {
+      TokenNameFinderModel model = new TokenNameFinderModel(modelIn);
+      nameFinder = new NameFinderME(model);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeech.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeech.java
deleted file mode 100644
index 8b255291b..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeech.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.ListField;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.postag.POSModel;
-import opennlp.tools.postag.POSTaggerME;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class PartOfSpeech implements EventProcessor<PartOfSpeechParameters> {
-
-  private static Logger log;
-
-  private String detection;
-  private POSTaggerME posTagger;
-
-  public PartOfSpeech() {
-  }
-
-  @Override
-  public void onInvocation(PartOfSpeechParameters partOfSpeechParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    log = partOfSpeechParameters.getGraph().getLogger(PartOfSpeech.class);
-    this.detection = partOfSpeechParameters.getDetectionName();
-
-    InputStream modelIn = new 
ByteArrayInputStream(partOfSpeechParameters.getFileContent());
-    POSModel model = null;
-    try {
-      model = new POSModel(modelIn);
-    } catch (IOException e) {
-      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
-    }
-
-    posTagger = new POSTaggerME(model);
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-    ListField text = inputEvent.getFieldBySelector(detection).getAsList();
-
-    String[] tags = 
posTagger.tag(text.castItems(String.class).stream().toArray(String[]::new));
-    double[] confidence = posTagger.probs();
-
-
-    inputEvent.addField(PartOfSpeechController.CONFIDENCE_KEY, confidence);
-    inputEvent.addField(PartOfSpeechController.TAG_KEY, tags);
-
-    out.collect(inputEvent);
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechParameters.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechParameters.java
deleted file mode 100644
index cf030cc54..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechParameters.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class PartOfSpeechParameters extends EventProcessorBindingParams {
-
-  private byte[] fileContent;
-  private String detectionName;
-
-  public PartOfSpeechParameters(DataProcessorInvocation graph, String 
fieldName, byte[] fileContent) {
-    super(graph);
-    this.detectionName = fieldName;
-    this.fileContent = fileContent;
-  }
-
-  public String getDetectionName() {
-    return detectionName;
-  }
-
-  public byte[] getFileContent() {
-    return fileContent;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechController.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
similarity index 58%
rename from 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechController.java
rename to 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
index eef0d95f1..d58cc6703 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechController.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
@@ -18,15 +18,14 @@
 
 package 
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
 
-import org.apache.streampipes.client.StreamPipesClient;
-import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.ListField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,16 +33,28 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class PartOfSpeechController extends 
StandaloneEventProcessingDeclarer<PartOfSpeechParameters> {
+import opennlp.tools.postag.POSModel;
+import opennlp.tools.postag.POSTaggerME;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class PartOfSpeechProcessor extends StreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   static final String CONFIDENCE_KEY = "confidencePos";
   static final String TAG_KEY = "tagPos";
   private static final String BINARY_FILE_KEY = "binary-file";
 
+  private String detection;
+  private POSTaggerME posTagger;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.partofspeech")
@@ -71,15 +82,39 @@ public class PartOfSpeechController extends 
StandaloneEventProcessingDeclarer<Pa
   }
 
   @Override
-  public ConfiguredEventProcessor<PartOfSpeechParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                       
ProcessingElementParameterExtractor extractor) {
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+    InputStream modelIn = new ByteArrayInputStream(fileContent);
+    POSModel model;
+    try {
+      model = new POSModel(modelIn);
+    } catch (IOException e) {
+      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
+    }
 
-    StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
-    String filename = extractor.selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = client.fileApi().getFileContent(filename);
-    String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+    posTagger = new POSTaggerME(model);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    ListField text = event.getFieldBySelector(detection).getAsList();
 
-    PartOfSpeechParameters params = new PartOfSpeechParameters(graph, 
detection, fileContent);
-    return new ConfiguredEventProcessor<>(params, PartOfSpeech::new);
+    String[] tags = 
posTagger.tag(text.castItems(String.class).toArray(String[]::new));
+    double[] confidence = posTagger.probs();
+
+
+    event.addField(PartOfSpeechProcessor.CONFIDENCE_KEY, confidence);
+    event.addField(PartOfSpeechProcessor.TAG_KEY, tags);
+
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetection.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetection.java
deleted file mode 100644
index bed475959..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetection.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.sentdetect.SentenceDetectorME;
-import opennlp.tools.sentdetect.SentenceModel;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SentenceDetection implements 
EventProcessor<SentenceDetectionParameters> {
-
-  private static Logger log;
-
-  // Field with the text
-  private String detection;
-  private SentenceDetectorME sentenceDetector;
-
-  public SentenceDetection() {
-  }
-
-  @Override
-  public void onInvocation(SentenceDetectionParameters 
sentenceDetectionParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    log = 
sentenceDetectionParameters.getGraph().getLogger(SentenceDetection.class);
-    this.detection = sentenceDetectionParameters.getDetectionName();
-
-    InputStream modelIn = new 
ByteArrayInputStream(sentenceDetectionParameters.getFileContent());
-    SentenceModel model = null;
-    try {
-      model = new SentenceModel(modelIn);
-    } catch (IOException e) {
-      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
-    }
-
-    sentenceDetector = new SentenceDetectorME(model);
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-    String text = 
inputEvent.getFieldBySelector(detection).getAsPrimitive().getAsString();
-
-    String sentences[] = sentenceDetector.sentDetect(text);
-
-    for (String sentence : sentences) {
-      inputEvent.updateFieldBySelector(detection, sentence);
-      out.collect(inputEvent);
-    }
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionParameters.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionParameters.java
deleted file mode 100644
index 2cf382b3b..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class SentenceDetectionParameters extends EventProcessorBindingParams {
-  private byte[] fileContent;
-  private String detectionName;
-
-  public SentenceDetectionParameters(DataProcessorInvocation graph, String 
fieldName, byte[] fileContent) {
-    super(graph);
-    this.detectionName = fieldName;
-    this.fileContent = fileContent;
-  }
-
-  public String getDetectionName() {
-    return detectionName;
-  }
-
-  public byte[] getFileContent() {
-    return fileContent;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionController.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
similarity index 54%
rename from 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionController.java
rename to 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
index aee54f179..852a04131 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionController.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
@@ -18,28 +18,38 @@
 
 package 
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
 
-import org.apache.streampipes.client.StreamPipesClient;
-import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class SentenceDetectionController extends 
StandaloneEventProcessingDeclarer<SentenceDetectionParameters> {
+import opennlp.tools.sentdetect.SentenceDetectorME;
+import opennlp.tools.sentdetect.SentenceModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SentenceDetectionProcessor extends StreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   private static final String BINARY_FILE_KEY = "binary-file";
 
+  private String detection;
+  private SentenceDetectorME sentenceDetector;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.sentencedetection")
@@ -59,16 +69,38 @@ public class SentenceDetectionController extends 
StandaloneEventProcessingDeclar
   }
 
   @Override
-  public ConfiguredEventProcessor<SentenceDetectionParameters> onInvocation(
-      DataProcessorInvocation graph,
-      ProcessingElementParameterExtractor extractor) {
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+    InputStream modelIn = new ByteArrayInputStream(fileContent);
+    SentenceModel model;
+    try {
+      model = new SentenceModel(modelIn);
+    } catch (IOException e) {
+      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
+    }
+
+    sentenceDetector = new SentenceDetectorME(model);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    String text = 
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+
+    String sentences[] = sentenceDetector.sentDetect(text);
 
-    StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
-    String filename = extractor.selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = client.fileApi().getFileContent(filename);
-    String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+    for (String sentence : sentences) {
+      event.updateFieldBySelector(detection, sentence);
+      collector.collect(event);
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
 
-    SentenceDetectionParameters params = new 
SentenceDetectionParameters(graph, detection, fileContent);
-    return new ConfiguredEventProcessor<>(params, SentenceDetection::new);
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/Tokenizer.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/Tokenizer.java
deleted file mode 100644
index ee12f2fb2..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/Tokenizer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.tokenize.TokenizerME;
-import opennlp.tools.tokenize.TokenizerModel;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class Tokenizer implements EventProcessor<TokenizerParameters> {
-
-  private static Logger log;
-
-  // Field with the text
-  private String detection;
-  private TokenizerME tokenizer;
-
-  public Tokenizer() {
-  }
-
-  @Override
-  public void onInvocation(TokenizerParameters tokenizerParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
-    log = tokenizerParameters.getGraph().getLogger(Tokenizer.class);
-    this.detection = tokenizerParameters.getDetectionName();
-
-    InputStream modelIn = new 
ByteArrayInputStream(tokenizerParameters.getFileContent());
-    TokenizerModel model = null;
-    try {
-      model = new TokenizerModel(modelIn);
-    } catch (IOException e) {
-      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
-    }
-
-    tokenizer = new TokenizerME(model);
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-    String text = 
inputEvent.getFieldBySelector(detection).getAsPrimitive().getAsString();
-
-    inputEvent.addField(TokenizerController.TOKEN_LIST_FIELD_KEY, 
tokenizer.tokenize(text));
-
-    out.collect(inputEvent);
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerParameters.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerParameters.java
deleted file mode 100644
index c0c9b517c..000000000
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class TokenizerParameters extends EventProcessorBindingParams {
-  private byte[] fileContent;
-  private String detectionName;
-
-  public TokenizerParameters(DataProcessorInvocation graph, String fieldName, 
byte[] fileContent) {
-    super(graph);
-    this.detectionName = fieldName;
-    this.fileContent = fileContent;
-  }
-
-  public String getDetectionName() {
-    return detectionName;
-  }
-
-  public byte[] getFileContent() {
-    return fileContent;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerController.java
 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
similarity index 58%
rename from 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerController.java
rename to 
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
index 2ae49658b..7c2e05d1b 100644
--- 
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerController.java
+++ 
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
@@ -18,30 +18,41 @@
 
 package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
 
-import org.apache.streampipes.client.StreamPipesClient;
-import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class TokenizerController extends 
StandaloneEventProcessingDeclarer<TokenizerParameters> {
+import opennlp.tools.tokenize.TokenizerME;
+import opennlp.tools.tokenize.TokenizerModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class TokenizerProcessor extends StreamPipesDataProcessor {
 
   private static final String DETECTION_FIELD_KEY = "detectionField";
   static final String TOKEN_LIST_FIELD_KEY = "tokenList";
   private static final String BINARY_FILE_KEY = "binary-file";
 
+  private String detection;
+  private TokenizerME tokenizer;
+
+
   //TODO: Maybe change outputStrategy to an array instead of tons of different 
strings
   @Override
   public DataProcessorDescription declareModel() {
@@ -64,15 +75,35 @@ public class TokenizerController extends 
StandaloneEventProcessingDeclarer<Token
   }
 
   @Override
-  public ConfiguredEventProcessor<TokenizerParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                    
ProcessingElementParameterExtractor extractor) {
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+    byte[] fileContent = 
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+    this.detection = 
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+    InputStream modelIn = new ByteArrayInputStream(fileContent);
+    TokenizerModel model;
+    try {
+      model = new TokenizerModel(modelIn);
+    } catch (IOException e) {
+      throw new SpRuntimeException("Error when loading the uploaded model.", 
e);
+    }
+
+    tokenizer = new TokenizerME(model);
+  }
 
-    StreamPipesClient client = new 
StreamPipesClientResolver().makeStreamPipesClientInstance();
-    String filename = extractor.selectedFilename(BINARY_FILE_KEY);
-    byte[] fileContent = client.fileApi().getFileContent(filename);
-    String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    String text = 
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+
+    event.addField(TokenizerProcessor.TOKEN_LIST_FIELD_KEY, 
tokenizer.tokenize(text));
+
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
 
-    TokenizerParameters params = new TokenizerParameters(graph, detection, 
fileContent);
-    return new ConfiguredEventProcessor<>(params, Tokenizer::new);
   }
 }

Reply via email to