This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 1601-migrate-text-mining-processors
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/1601-migrate-text-mining-processors by this push:
new 4918a3de8 Migrate text mining processors (#1601)
4918a3de8 is described below
commit 4918a3de8d0d3864ddab4b933fce4a410c5365d3
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri May 26 14:20:22 2023 +0200
Migrate text mining processors (#1601)
---
.../textmining/jvm/TextMiningJvmInit.java | 24 +++---
.../textmining/jvm/processor/chunker/Chunker.java | 99 ----------------------
.../jvm/processor/chunker/ChunkerParameters.java | 47 ----------
...hunkerController.java => ChunkerProcessor.java} | 79 +++++++++++++----
.../jvm/processor/language/LanguageDetection.java | 79 -----------------
.../language/LanguageDetectionParameters.java | 41 ---------
...roller.java => LanguageDetectionProcessor.java} | 65 ++++++++++----
.../jvm/processor/namefinder/NameFinder.java | 85 -------------------
.../processor/namefinder/NameFinderParameters.java | 42 ---------
...derController.java => NameFinderProcessor.java} | 85 ++++++++++++-------
.../jvm/processor/partofspeech/PartOfSpeech.java | 81 ------------------
.../partofspeech/PartOfSpeechParameters.java | 42 ---------
...hController.java => PartOfSpeechProcessor.java} | 65 ++++++++++----
.../sentencedetection/SentenceDetection.java | 79 -----------------
.../SentenceDetectionParameters.java | 41 ---------
...roller.java => SentenceDetectionProcessor.java} | 64 ++++++++++----
.../jvm/processor/tokenizer/Tokenizer.java | 76 -----------------
.../processor/tokenizer/TokenizerParameters.java | 41 ---------
...izerController.java => TokenizerProcessor.java} | 61 +++++++++----
19 files changed, 323 insertions(+), 873 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
index 1af275081..b4ead4b46 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/TextMiningJvmInit.java
@@ -27,12 +27,12 @@ import
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBui
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import
org.apache.streampipes.processors.textmining.jvm.processor.chunker.ChunkerController;
-import
org.apache.streampipes.processors.textmining.jvm.processor.language.LanguageDetectionController;
-import
org.apache.streampipes.processors.textmining.jvm.processor.namefinder.NameFinderController;
-import
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech.PartOfSpeechController;
-import
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection.SentenceDetectionController;
-import
org.apache.streampipes.processors.textmining.jvm.processor.tokenizer.TokenizerController;
+import
org.apache.streampipes.processors.textmining.jvm.processor.chunker.ChunkerProcessor;
+import
org.apache.streampipes.processors.textmining.jvm.processor.language.LanguageDetectionProcessor;
+import
org.apache.streampipes.processors.textmining.jvm.processor.namefinder.NameFinderProcessor;
+import
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech.PartOfSpeechProcessor;
+import
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection.SentenceDetectionProcessor;
+import
org.apache.streampipes.processors.textmining.jvm.processor.tokenizer.TokenizerProcessor;
import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter;
public class TextMiningJvmInit extends ExtensionsModelSubmitter {
@@ -47,12 +47,12 @@ public class TextMiningJvmInit extends
ExtensionsModelSubmitter {
return
SpServiceDefinitionBuilder.create("org.apache.streampipes.processors.textmining.jvm",
"Processors Text Mining JVM", "",
8090)
- .registerPipelineElements(new LanguageDetectionController(),
- new TokenizerController(),
- new PartOfSpeechController(),
- new ChunkerController(),
- new NameFinderController(),
- new SentenceDetectionController())
+ .registerPipelineElements(new LanguageDetectionProcessor(),
+ new TokenizerProcessor(),
+ new PartOfSpeechProcessor(),
+ new ChunkerProcessor(),
+ new NameFinderProcessor(),
+ new SentenceDetectionProcessor())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/Chunker.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/Chunker.java
deleted file mode 100644
index ab6c1392c..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/Chunker.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.ListField;
-import
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.chunker.ChunkerME;
-import opennlp.tools.chunker.ChunkerModel;
-import opennlp.tools.util.Span;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.List;
-
-public class Chunker implements EventProcessor<ChunkerParameters> {
-
- private static Logger log;
-
- private String tags;
- private String tokens;
- private ChunkerME chunker;
-
- public Chunker() {
-// try (InputStream modelIn =
getClass().getClassLoader().getResourceAsStream("chunker-en.bin")) {
-// ChunkerModel model = new ChunkerModel(modelIn);
-// chunker = new ChunkerME(model);
-// } catch (IOException e) {
-// e.printStackTrace();
-// }
- }
-
- @Override
- public void onInvocation(ChunkerParameters chunkerParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- log = chunkerParameters.getGraph().getLogger(Chunker.class);
- this.tags = chunkerParameters.getTags();
- this.tokens = chunkerParameters.getTokens();
-
- InputStream modelIn = new
ByteArrayInputStream(chunkerParameters.getFileContent());
- ChunkerModel model = null;
- try {
- model = new ChunkerModel(modelIn);
- } catch (IOException e) {
- throw new SpRuntimeException("Error when loading the uploaded model.",
e);
- }
-
- chunker = new ChunkerME(model);
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) throws
SpRuntimeException {
- ListField tags = inputEvent.getFieldBySelector(this.tags).getAsList();
- ListField tokens = inputEvent.getFieldBySelector(this.tokens).getAsList();
-
-
- String[] tagsArray =
tags.castItems(String.class).stream().toArray(String[]::new);
- String[] tokensArray =
tokens.castItems(String.class).stream().toArray(String[]::new);
-
- Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
-
- List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
- String[] types = Arrays.stream(spans).map(s ->
s.getType()).toArray(String[]::new);
-
- inputEvent.addField(ChunkerController.CHUNK_TYPE_FIELD_KEY, types);
- inputEvent.addField(ChunkerController.CHUNK_FIELD_KEY, chunks);
-
- out.collect(inputEvent);
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerParameters.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerParameters.java
deleted file mode 100644
index b94361e46..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerParameters.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class ChunkerParameters extends EventProcessorBindingParams {
- private String tags;
- private String tokens;
- private byte[] fileContent;
-
- public ChunkerParameters(DataProcessorInvocation graph, String tags, String
tokens, byte[] fileContent) {
- super(graph);
- this.tags = tags;
- this.tokens = tokens;
- this.fileContent = fileContent;
- }
-
- public String getTags() {
- return tags;
- }
-
- public String getTokens() {
- return tokens;
- }
-
- public byte[] getFileContent() {
- return fileContent;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerController.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
similarity index 54%
rename from
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerController.java
rename to
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
index 49052335c..9b1ef12aa 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerController.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
@@ -18,15 +18,15 @@
package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
-import org.apache.streampipes.client.StreamPipesClient;
-import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.schema.PropertyScope;
+import
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,10 +34,22 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class ChunkerController extends
StandaloneEventProcessingDeclarer<ChunkerParameters> {
+import opennlp.tools.chunker.ChunkerME;
+import opennlp.tools.chunker.ChunkerModel;
+import opennlp.tools.util.Span;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class ChunkerProcessor extends StreamPipesDataProcessor {
private static final String TAGS_FIELD_KEY = "tagsField";
private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -45,6 +57,10 @@ public class ChunkerController extends
StandaloneEventProcessingDeclarer<Chunker
static final String CHUNK_FIELD_KEY = "chunk";
private static final String BINARY_FILE_KEY = "binary-file";
+ private String tags;
+ private String tokens;
+ private ChunkerME chunker;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.chunker")
@@ -76,17 +92,48 @@ public class ChunkerController extends
StandaloneEventProcessingDeclarer<Chunker
}
@Override
- public ConfiguredEventProcessor<ChunkerParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext context) throws
SpRuntimeException {
+ this.tags = parameters.extractor().mappingPropertyValue(TAGS_FIELD_KEY);
+ this.tokens =
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
+ String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent =
context.getStreamPipesClient().fileApi().getFileContent(filename);
+
+ InputStream modelIn = new ByteArrayInputStream(fileContent);
+ ChunkerModel model;
+ try {
+ model = new ChunkerModel(modelIn);
+ } catch (IOException e) {
+ throw new SpRuntimeException("Error when loading the uploaded model.",
e);
+ }
+
+ chunker = new ChunkerME(model);
+ }
+
+ @Override
+ public void onEvent(Event event,
+ SpOutputCollector collector) throws SpRuntimeException {
+ ListField tags = event.getFieldBySelector(this.tags).getAsList();
+ ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
- StreamPipesClient client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
- String filename = extractor.selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent = client.fileApi().getFileContent(filename);
- String tags = extractor.mappingPropertyValue(TAGS_FIELD_KEY);
- String tokens = extractor.mappingPropertyValue(TOKENS_FIELD_KEY);
+ String[] tagsArray = tags.castItems(String.class).toArray(String[]::new);
+ String[] tokensArray =
tokens.castItems(String.class).toArray(String[]::new);
+
+ Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
+
+ List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
+ String[] types =
Arrays.stream(spans).map(Span::getType).toArray(String[]::new);
+
+ event.addField(ChunkerProcessor.CHUNK_TYPE_FIELD_KEY, types);
+ event.addField(ChunkerProcessor.CHUNK_FIELD_KEY, chunks);
+
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
- ChunkerParameters params = new ChunkerParameters(graph, tags, tokens,
fileContent);
- return new ConfiguredEventProcessor<>(params, Chunker::new);
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetection.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetection.java
deleted file mode 100644
index aa200f435..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetection.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.language;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.langdetect.Language;
-import opennlp.tools.langdetect.LanguageDetector;
-import opennlp.tools.langdetect.LanguageDetectorME;
-import opennlp.tools.langdetect.LanguageDetectorModel;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class LanguageDetection implements
EventProcessor<LanguageDetectionParameters> {
-
- private static Logger log;
-
- private String detection;
- private LanguageDetector languageDetector;
-
- public LanguageDetection() {
- }
-
- @Override
- public void onInvocation(LanguageDetectionParameters
languageDetectionParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- log =
languageDetectionParameters.getGraph().getLogger(LanguageDetection.class);
- this.detection = languageDetectionParameters.getDetectionName();
-
- InputStream modelIn = new
ByteArrayInputStream(languageDetectionParameters.getFileContent());
- LanguageDetectorModel model = null;
- try {
- model = new LanguageDetectorModel(modelIn);
- } catch (IOException e) {
- throw new SpRuntimeException("Error when loading the uploaded model.",
e);
- }
-
- languageDetector = new LanguageDetectorME(model);
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
- String text =
inputEvent.getFieldBySelector(detection).getAsPrimitive().getAsString();
- Language language = languageDetector.predictLanguage(text);
-
- inputEvent.addField(LanguageDetectionController.LANGUAGE_KEY,
language.getLang());
- inputEvent.addField(LanguageDetectionController.CONFIDENCE_KEY,
language.getConfidence());
-
- out.collect(inputEvent);
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionParameters.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionParameters.java
deleted file mode 100644
index 0f33bda39..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.language;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class LanguageDetectionParameters extends EventProcessorBindingParams {
- private byte[] fileContent;
- private String detectionName;
-
- public LanguageDetectionParameters(DataProcessorInvocation graph, String
fieldName, byte[] fileContent) {
- super(graph);
- this.detectionName = fieldName;
- this.fileContent = fileContent;
- }
-
- public String getDetectionName() {
- return detectionName;
- }
-
- public byte[] getFileContent() {
- return fileContent;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionController.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
similarity index 56%
rename from
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionController.java
rename to
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
index 0e1e0a380..e970112f4 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionController.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
@@ -18,31 +18,43 @@
package org.apache.streampipes.processors.textmining.jvm.processor.language;
-import org.apache.streampipes.client.StreamPipesClient;
-import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class LanguageDetectionController extends
StandaloneEventProcessingDeclarer<LanguageDetectionParameters> {
+import opennlp.tools.langdetect.Language;
+import opennlp.tools.langdetect.LanguageDetector;
+import opennlp.tools.langdetect.LanguageDetectorME;
+import opennlp.tools.langdetect.LanguageDetectorModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LanguageDetectionProcessor extends StreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
static final String LANGUAGE_KEY = "language";
static final String CONFIDENCE_KEY = "confidenceLanguage";
private static final String BINARY_FILE_KEY = "binary-file";
+ private String detection;
+ private LanguageDetector languageDetector;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.languagedetection")
@@ -70,16 +82,37 @@ public class LanguageDetectionController extends
StandaloneEventProcessingDeclar
}
@Override
- public ConfiguredEventProcessor<LanguageDetectionParameters> onInvocation(
- DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext context) throws
SpRuntimeException {
+ String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent =
context.getStreamPipesClient().fileApi().getFileContent(filename);
+ this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+ InputStream modelIn = new ByteArrayInputStream(fileContent);
+ LanguageDetectorModel model = null;
+ try {
+ model = new LanguageDetectorModel(modelIn);
+ } catch (IOException e) {
+ throw new SpRuntimeException("Error when loading the uploaded model.",
e);
+ }
+
+ languageDetector = new LanguageDetectorME(model);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ String text =
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+ Language language = languageDetector.predictLanguage(text);
+
+ event.addField(LanguageDetectionProcessor.LANGUAGE_KEY,
language.getLang());
+ event.addField(LanguageDetectionProcessor.CONFIDENCE_KEY,
language.getConfidence());
- StreamPipesClient client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
- String filename = extractor.selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent = client.fileApi().getFileContent(filename);
- String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
- LanguageDetectionParameters params = new
LanguageDetectionParameters(graph, detection, fileContent);
- return new ConfiguredEventProcessor<>(params, LanguageDetection::new);
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinder.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinder.java
deleted file mode 100644
index 9455c30be..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.ListField;
-import
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.namefind.NameFinderME;
-import opennlp.tools.namefind.TokenNameFinderModel;
-import opennlp.tools.util.Span;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-public class NameFinder implements EventProcessor<NameFinderParameters> {
-
- private String tokens;
- private NameFinderME nameFinder;
-
- public NameFinder() {
-
- }
-
- @Override
- public void onInvocation(NameFinderParameters nameFinderParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
-
- loadModel(nameFinderParameters.getModel());
-
- this.tokens = nameFinderParameters.getTokens();
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) throws
SpRuntimeException {
- ListField tokens = inputEvent.getFieldBySelector(this.tokens).getAsList();
-
- String[] tokensArray =
tokens.castItems(String.class).stream().toArray(String[]::new);
- Span[] spans = nameFinder.find(tokensArray);
-
- // Generating the list of names from the found spans by the nameFinder
- List<String> names = TextMiningUtil.extractSpans(spans, tokensArray);
-
- nameFinder.clearAdaptiveData();
-
- inputEvent.addField(NameFinderController.FOUND_NAME_FIELD_KEY, names);
- out.collect(inputEvent);
- }
-
- @Override
- public void onDetach() {
- }
-
- private void loadModel(byte[] modelContent) {
- try (InputStream modelIn = new ByteArrayInputStream(modelContent)) {
- TokenNameFinderModel model = new TokenNameFinderModel(modelIn);
- nameFinder = new NameFinderME(model);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderParameters.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderParameters.java
deleted file mode 100644
index 627f2a213..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderParameters.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class NameFinderParameters extends EventProcessorBindingParams {
- private String tokens;
- private byte[] model;
-
- public NameFinderParameters(DataProcessorInvocation graph, String tokens,
byte[] model) {
- super(graph);
- this.tokens = tokens;
- this.model = model;
- }
-
- public String getTokens() {
- return tokens;
- }
-
- public byte[] getModel() {
- return model;
- }
-
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderController.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
similarity index 52%
rename from
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderController.java
rename to
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
index 9d1d19761..30d9ef1a0 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderController.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
@@ -18,15 +18,15 @@
package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
-import org.apache.streampipes.client.StreamPipesClient;
-import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.schema.PropertyScope;
+import
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,15 +34,29 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class NameFinderController extends
StandaloneEventProcessingDeclarer<NameFinderParameters> {
+import opennlp.tools.namefind.NameFinderME;
+import opennlp.tools.namefind.TokenNameFinderModel;
+import opennlp.tools.util.Span;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class NameFinderProcessor extends StreamPipesDataProcessor {
private static final String MODEL = "model";
private static final String TOKENS_FIELD_KEY = "tokensField";
static final String FOUND_NAME_FIELD_KEY = "foundNames";
+ private String tokens;
+ private NameFinderME nameFinder;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.namefinder")
@@ -66,31 +80,42 @@ public class NameFinderController extends
StandaloneEventProcessingDeclarer<Name
}
@Override
- public ConfiguredEventProcessor<NameFinderParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ String filename = parameters.extractor().selectedFilename(MODEL);
+ byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+ this.tokens =
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
+ loadModel(fileContent);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
+
+ String[] tokensArray =
tokens.castItems(String.class).toArray(String[]::new);
+ Span[] spans = nameFinder.find(tokensArray);
- StreamPipesClient client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
- String filename = extractor.selectedFilename(MODEL);
- byte[] fileContent = client.fileApi().getFileContent(filename);
- String tokens = extractor.mappingPropertyValue(TOKENS_FIELD_KEY);
+ // Generating the list of names from the found spans by the nameFinder
+ List<String> names = TextMiningUtil.extractSpans(spans, tokensArray);
+
+ nameFinder.clearAdaptiveData();
+
+ event.addField(NameFinderProcessor.FOUND_NAME_FIELD_KEY, names);
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
- NameFinderParameters params = new NameFinderParameters(graph, tokens,
fileContent);
- return new ConfiguredEventProcessor<>(params, NameFinder::new);
}
-// @Override
-// public List<Option> resolveOptions(String requestId,
StaticPropertyExtractor parameterExtractor) {
-// String directoryPath = TextMiningJvmConfig.INSTANCE.getModelDirectory();
-//
-// List<Option> result = new ArrayList<>();
-//
-// File folder = new File(directoryPath);
-// File[] listOfFiles = folder.listFiles();
-//
-// for (File file : listOfFiles) {
-// result.add(new Option(file.getName()));
-// }
-//
-// return result;
-// }
+ private void loadModel(byte[] modelContent) {
+ try (InputStream modelIn = new ByteArrayInputStream(modelContent)) {
+ TokenNameFinderModel model = new TokenNameFinderModel(modelIn);
+ nameFinder = new NameFinderME(model);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeech.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeech.java
deleted file mode 100644
index 8b255291b..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeech.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.ListField;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.postag.POSModel;
-import opennlp.tools.postag.POSTaggerME;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class PartOfSpeech implements EventProcessor<PartOfSpeechParameters> {
-
- private static Logger log;
-
- private String detection;
- private POSTaggerME posTagger;
-
- public PartOfSpeech() {
- }
-
- @Override
- public void onInvocation(PartOfSpeechParameters partOfSpeechParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- log = partOfSpeechParameters.getGraph().getLogger(PartOfSpeech.class);
- this.detection = partOfSpeechParameters.getDetectionName();
-
- InputStream modelIn = new
ByteArrayInputStream(partOfSpeechParameters.getFileContent());
- POSModel model = null;
- try {
- model = new POSModel(modelIn);
- } catch (IOException e) {
- throw new SpRuntimeException("Error when loading the uploaded model.",
e);
- }
-
- posTagger = new POSTaggerME(model);
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
- ListField text = inputEvent.getFieldBySelector(detection).getAsList();
-
- String[] tags =
posTagger.tag(text.castItems(String.class).stream().toArray(String[]::new));
- double[] confidence = posTagger.probs();
-
-
- inputEvent.addField(PartOfSpeechController.CONFIDENCE_KEY, confidence);
- inputEvent.addField(PartOfSpeechController.TAG_KEY, tags);
-
- out.collect(inputEvent);
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechParameters.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechParameters.java
deleted file mode 100644
index cf030cc54..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechParameters.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class PartOfSpeechParameters extends EventProcessorBindingParams {
-
- private byte[] fileContent;
- private String detectionName;
-
- public PartOfSpeechParameters(DataProcessorInvocation graph, String
fieldName, byte[] fileContent) {
- super(graph);
- this.detectionName = fieldName;
- this.fileContent = fileContent;
- }
-
- public String getDetectionName() {
- return detectionName;
- }
-
- public byte[] getFileContent() {
- return fileContent;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechController.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
similarity index 58%
rename from
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechController.java
rename to
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
index eef0d95f1..d58cc6703 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechController.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
@@ -18,15 +18,14 @@
package
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
-import org.apache.streampipes.client.StreamPipesClient;
-import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -34,16 +33,28 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class PartOfSpeechController extends
StandaloneEventProcessingDeclarer<PartOfSpeechParameters> {
+import opennlp.tools.postag.POSModel;
+import opennlp.tools.postag.POSTaggerME;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class PartOfSpeechProcessor extends StreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
static final String CONFIDENCE_KEY = "confidencePos";
static final String TAG_KEY = "tagPos";
private static final String BINARY_FILE_KEY = "binary-file";
+ private String detection;
+ private POSTaggerME posTagger;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.partofspeech")
@@ -71,15 +82,39 @@ public class PartOfSpeechController extends
StandaloneEventProcessingDeclarer<Pa
}
@Override
- public ConfiguredEventProcessor<PartOfSpeechParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+ this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+ InputStream modelIn = new ByteArrayInputStream(fileContent);
+ POSModel model;
+ try {
+ model = new POSModel(modelIn);
+ } catch (IOException e) {
+ throw new SpRuntimeException("Error when loading the uploaded model.",
e);
+ }
- StreamPipesClient client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
- String filename = extractor.selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent = client.fileApi().getFileContent(filename);
- String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+ posTagger = new POSTaggerME(model);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ ListField text = event.getFieldBySelector(detection).getAsList();
- PartOfSpeechParameters params = new PartOfSpeechParameters(graph,
detection, fileContent);
- return new ConfiguredEventProcessor<>(params, PartOfSpeech::new);
+ String[] tags =
posTagger.tag(text.castItems(String.class).toArray(String[]::new));
+ double[] confidence = posTagger.probs();
+
+
+ event.addField(PartOfSpeechProcessor.CONFIDENCE_KEY, confidence);
+ event.addField(PartOfSpeechProcessor.TAG_KEY, tags);
+
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetection.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetection.java
deleted file mode 100644
index bed475959..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetection.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.sentdetect.SentenceDetectorME;
-import opennlp.tools.sentdetect.SentenceModel;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SentenceDetection implements
EventProcessor<SentenceDetectionParameters> {
-
- private static Logger log;
-
- // Field with the text
- private String detection;
- private SentenceDetectorME sentenceDetector;
-
- public SentenceDetection() {
- }
-
- @Override
- public void onInvocation(SentenceDetectionParameters
sentenceDetectionParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- log =
sentenceDetectionParameters.getGraph().getLogger(SentenceDetection.class);
- this.detection = sentenceDetectionParameters.getDetectionName();
-
- InputStream modelIn = new
ByteArrayInputStream(sentenceDetectionParameters.getFileContent());
- SentenceModel model = null;
- try {
- model = new SentenceModel(modelIn);
- } catch (IOException e) {
- throw new SpRuntimeException("Error when loading the uploaded model.",
e);
- }
-
- sentenceDetector = new SentenceDetectorME(model);
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
- String text =
inputEvent.getFieldBySelector(detection).getAsPrimitive().getAsString();
-
- String sentences[] = sentenceDetector.sentDetect(text);
-
- for (String sentence : sentences) {
- inputEvent.updateFieldBySelector(detection, sentence);
- out.collect(inputEvent);
- }
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionParameters.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionParameters.java
deleted file mode 100644
index 2cf382b3b..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class SentenceDetectionParameters extends EventProcessorBindingParams {
- private byte[] fileContent;
- private String detectionName;
-
- public SentenceDetectionParameters(DataProcessorInvocation graph, String
fieldName, byte[] fileContent) {
- super(graph);
- this.detectionName = fieldName;
- this.fileContent = fileContent;
- }
-
- public String getDetectionName() {
- return detectionName;
- }
-
- public byte[] getFileContent() {
- return fileContent;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionController.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
similarity index 54%
rename from
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionController.java
rename to
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
index aee54f179..852a04131 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionController.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
@@ -18,28 +18,38 @@
package
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
-import org.apache.streampipes.client.StreamPipesClient;
-import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class SentenceDetectionController extends
StandaloneEventProcessingDeclarer<SentenceDetectionParameters> {
+import opennlp.tools.sentdetect.SentenceDetectorME;
+import opennlp.tools.sentdetect.SentenceModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SentenceDetectionProcessor extends StreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
private static final String BINARY_FILE_KEY = "binary-file";
+ private String detection;
+ private SentenceDetectorME sentenceDetector;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.textmining.jvm.sentencedetection")
@@ -59,16 +69,38 @@ public class SentenceDetectionController extends
StandaloneEventProcessingDeclar
}
@Override
- public ConfiguredEventProcessor<SentenceDetectionParameters> onInvocation(
- DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+ this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+ InputStream modelIn = new ByteArrayInputStream(fileContent);
+ SentenceModel model;
+ try {
+ model = new SentenceModel(modelIn);
+ } catch (IOException e) {
+ throw new SpRuntimeException("Error when loading the uploaded model.",
e);
+ }
+
+ sentenceDetector = new SentenceDetectorME(model);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ String text =
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+
+ String sentences[] = sentenceDetector.sentDetect(text);
- StreamPipesClient client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
- String filename = extractor.selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent = client.fileApi().getFileContent(filename);
- String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+ for (String sentence : sentences) {
+ event.updateFieldBySelector(detection, sentence);
+ collector.collect(event);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
- SentenceDetectionParameters params = new
SentenceDetectionParameters(graph, detection, fileContent);
- return new ConfiguredEventProcessor<>(params, SentenceDetection::new);
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/Tokenizer.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/Tokenizer.java
deleted file mode 100644
index ee12f2fb2..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/Tokenizer.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import opennlp.tools.tokenize.TokenizerME;
-import opennlp.tools.tokenize.TokenizerModel;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class Tokenizer implements EventProcessor<TokenizerParameters> {
-
- private static Logger log;
-
- // Field with the text
- private String detection;
- private TokenizerME tokenizer;
-
- public Tokenizer() {
- }
-
- @Override
- public void onInvocation(TokenizerParameters tokenizerParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- log = tokenizerParameters.getGraph().getLogger(Tokenizer.class);
- this.detection = tokenizerParameters.getDetectionName();
-
- InputStream modelIn = new
ByteArrayInputStream(tokenizerParameters.getFileContent());
- TokenizerModel model = null;
- try {
- model = new TokenizerModel(modelIn);
- } catch (IOException e) {
- throw new SpRuntimeException("Error when loading the uploaded model.",
e);
- }
-
- tokenizer = new TokenizerME(model);
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
- String text =
inputEvent.getFieldBySelector(detection).getAsPrimitive().getAsString();
-
- inputEvent.addField(TokenizerController.TOKEN_LIST_FIELD_KEY,
tokenizer.tokenize(text));
-
- out.collect(inputEvent);
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerParameters.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerParameters.java
deleted file mode 100644
index c0c9b517c..000000000
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerParameters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class TokenizerParameters extends EventProcessorBindingParams {
- private byte[] fileContent;
- private String detectionName;
-
- public TokenizerParameters(DataProcessorInvocation graph, String fieldName,
byte[] fileContent) {
- super(graph);
- this.detectionName = fieldName;
- this.fileContent = fileContent;
- }
-
- public String getDetectionName() {
- return detectionName;
- }
-
- public byte[] getFileContent() {
- return fileContent;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerController.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
similarity index 58%
rename from
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerController.java
rename to
streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
index 2ae49658b..7c2e05d1b 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerController.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
@@ -18,30 +18,41 @@
package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
-import org.apache.streampipes.client.StreamPipesClient;
-import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class TokenizerController extends
StandaloneEventProcessingDeclarer<TokenizerParameters> {
+import opennlp.tools.tokenize.TokenizerME;
+import opennlp.tools.tokenize.TokenizerModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class TokenizerProcessor extends StreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
static final String TOKEN_LIST_FIELD_KEY = "tokenList";
private static final String BINARY_FILE_KEY = "binary-file";
+ private String detection;
+ private TokenizerME tokenizer;
+
+
//TODO: Maybe change outputStrategy to an array instead of tons of different
strings
@Override
public DataProcessorDescription declareModel() {
@@ -64,15 +75,35 @@ public class TokenizerController extends
StandaloneEventProcessingDeclarer<Token
}
@Override
- public ConfiguredEventProcessor<TokenizerParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
+ this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+
+ InputStream modelIn = new ByteArrayInputStream(fileContent);
+ TokenizerModel model;
+ try {
+ model = new TokenizerModel(modelIn);
+ } catch (IOException e) {
+ throw new SpRuntimeException("Error when loading the uploaded model.",
e);
+ }
+
+ tokenizer = new TokenizerME(model);
+ }
- StreamPipesClient client = new
StreamPipesClientResolver().makeStreamPipesClientInstance();
- String filename = extractor.selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent = client.fileApi().getFileContent(filename);
- String detection = extractor.mappingPropertyValue(DETECTION_FIELD_KEY);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ String text =
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+
+ event.addField(TokenizerProcessor.TOKEN_LIST_FIELD_KEY,
tokenizer.tokenize(text));
+
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
- TokenizerParameters params = new TokenizerParameters(graph, detection,
fileContent);
- return new ConfiguredEventProcessor<>(params, Tokenizer::new);
}
}