This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
3655-update-processors-in-processors-text-mining-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3655-update-processors-in-processors-text-mining-jvm by this push:
new 34faba2bf9 refactor(#3655): Update text mining processors to implement
IStreamPipesDataProcessor interface
34faba2bf9 is described below
commit 34faba2bf9ce743d86038f625c810615b8b6f2a7
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jun 11 09:45:00 2025 +0200
refactor(#3655): Update text mining processors to implement
IStreamPipesDataProcessor interface
---
.../jvm/processor/chunker/ChunkerProcessor.java | 117 ++++++++++++---------
.../language/LanguageDetectionProcessor.java | 91 +++++++++-------
.../processor/namefinder/NameFinderProcessor.java | 83 +++++++++------
.../partofspeech/PartOfSpeechProcessor.java | 90 +++++++++-------
.../SentenceDetectionProcessor.java | 70 +++++++-----
.../processor/tokenizer/TokenizerProcessor.java | 77 ++++++++------
6 files changed, 308 insertions(+), 220 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
index 63c90ac447..00e37328be 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java
@@ -19,25 +19,26 @@
package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import opennlp.tools.chunker.ChunkerME;
import opennlp.tools.chunker.ChunkerModel;
@@ -49,7 +50,7 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
-public class ChunkerProcessor extends StreamPipesDataProcessor {
+public class ChunkerProcessor implements IStreamPipesDataProcessor {
private static final String TAGS_FIELD_KEY = "tagsField";
private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -62,44 +63,59 @@ public class ChunkerProcessor extends
StreamPipesDataProcessor {
private ChunkerME chunker;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
- .create("org.apache.streampipes.processors.textmining.jvm.chunker", 0)
- .category(DataProcessorType.ENRICH_TEXT)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredFile(Labels.withId(BINARY_FILE_KEY))
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.listRequirement(Datatypes.String),
- Labels.withId(TAGS_FIELD_KEY),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.listRequirement(Datatypes.String),
- Labels.withId(TOKENS_FIELD_KEY),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.listStringEp(
- Labels.withId(CHUNK_TYPE_FIELD_KEY),
- CHUNK_TYPE_FIELD_KEY,
- "http://schema.org/ItemList"),
- EpProperties.listStringEp(
- Labels.withId(CHUNK_FIELD_KEY),
- CHUNK_FIELD_KEY,
- "http://schema.org/ItemList")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ChunkerProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.textmining.jvm.chunker", 0)
+ .category(DataProcessorType.ENRICH_TEXT)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredFile(Labels.withId(BINARY_FILE_KEY))
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.listRequirement(Datatypes.String),
+ Labels.withId(TAGS_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.listRequirement(Datatypes.String),
+ Labels.withId(TOKENS_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.listStringEp(
+ Labels.withId(CHUNK_TYPE_FIELD_KEY),
+ CHUNK_TYPE_FIELD_KEY,
+ "http://schema.org/ItemList"
+ ),
+ EpProperties.listStringEp(
+ Labels.withId(CHUNK_FIELD_KEY),
+ CHUNK_FIELD_KEY,
+ "http://schema.org/ItemList"
+ )
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext context) throws
SpRuntimeException {
- this.tags = parameters.extractor().mappingPropertyValue(TAGS_FIELD_KEY);
- this.tokens =
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
- String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent =
context.getStreamPipesClient().fileApi().getFileContent(filename);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext context
+ ) {
+ this.tags = params.extractor()
+ .mappingPropertyValue(TAGS_FIELD_KEY);
+ this.tokens = params.extractor()
+ .mappingPropertyValue(TOKENS_FIELD_KEY);
+ String filename = params.extractor()
+ .selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent = context.getStreamPipesClient()
+ .fileApi()
+ .getFileContent(filename);
InputStream modelIn = new ByteArrayInputStream(fileContent);
ChunkerModel model;
@@ -113,19 +129,23 @@ public class ChunkerProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onEvent(Event event,
- SpOutputCollector collector) throws SpRuntimeException {
- ListField tags = event.getFieldBySelector(this.tags).getAsList();
- ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ ListField tags = event.getFieldBySelector(this.tags)
+ .getAsList();
+ ListField tokens = event.getFieldBySelector(this.tokens)
+ .getAsList();
-
- String[] tagsArray = tags.castItems(String.class).toArray(String[]::new);
- String[] tokensArray =
tokens.castItems(String.class).toArray(String[]::new);
+ String[] tagsArray = tags.castItems(String.class)
+ .toArray(String[]::new);
+ String[] tokensArray = tokens.castItems(String.class)
+ .toArray(String[]::new);
Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
- String[] types =
Arrays.stream(spans).map(Span::getType).toArray(String[]::new);
+ String[] types = Arrays.stream(spans)
+ .map(Span::getType)
+ .toArray(String[]::new);
event.addField(ChunkerProcessor.CHUNK_TYPE_FIELD_KEY, types);
event.addField(ChunkerProcessor.CHUNK_FIELD_KEY, chunks);
@@ -134,7 +154,6 @@ public class ChunkerProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
index 0f4dca2d76..9287e2edb7 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java
@@ -19,22 +19,23 @@
package org.apache.streampipes.processors.textmining.jvm.processor.language;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import opennlp.tools.langdetect.Language;
import opennlp.tools.langdetect.LanguageDetector;
@@ -45,7 +46,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-public class LanguageDetectionProcessor extends StreamPipesDataProcessor {
+public class LanguageDetectionProcessor implements IStreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
static final String LANGUAGE_KEY = "language";
@@ -56,42 +57,55 @@ public class LanguageDetectionProcessor extends
StreamPipesDataProcessor {
private LanguageDetector languageDetector;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.textmining.jvm.languagedetection", 0)
- .category(DataProcessorType.ENRICH_TEXT)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredFile(Labels.withId(BINARY_FILE_KEY))
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.stringReq(),
- Labels.withId(DETECTION_FIELD_KEY),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(
- Labels.withId(LANGUAGE_KEY),
- LANGUAGE_KEY,
- "http://schema.org/language"),
- EpProperties.doubleEp(
- Labels.withId(CONFIDENCE_KEY),
- CONFIDENCE_KEY,
- "https://schema.org/Float")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ LanguageDetectionProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.textmining.jvm.languagedetection", 0)
+ .category(DataProcessorType.ENRICH_TEXT)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredFile(Labels.withId(BINARY_FILE_KEY))
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(DETECTION_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(
+ Labels.withId(LANGUAGE_KEY),
+ LANGUAGE_KEY,
+ "http://schema.org/language"
+ ),
+ EpProperties.doubleEp(
+ Labels.withId(CONFIDENCE_KEY),
+ CONFIDENCE_KEY,
+ "https://schema.org/Float"
+ )
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext context) throws
SpRuntimeException {
- String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent =
context.getStreamPipesClient().fileApi().getFileContent(filename);
- this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext context
+ ) {
+ String filename = params.extractor()
+ .selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent = context.getStreamPipesClient()
+ .fileApi()
+ .getFileContent(filename);
+ this.detection = params.extractor()
+ .mappingPropertyValue(DETECTION_FIELD_KEY);
InputStream modelIn = new ByteArrayInputStream(fileContent);
- LanguageDetectorModel model = null;
+ LanguageDetectorModel model;
try {
model = new LanguageDetectorModel(modelIn);
} catch (IOException e) {
@@ -103,7 +117,9 @@ public class LanguageDetectionProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String text =
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+ String text = event.getFieldBySelector(detection)
+ .getAsPrimitive()
+ .getAsString();
Language language = languageDetector.predictLanguage(text);
event.addField(LanguageDetectionProcessor.LANGUAGE_KEY,
language.getLang());
@@ -113,7 +129,6 @@ public class LanguageDetectionProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
index 8120a8467f..44f405add6 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/namefinder/NameFinderProcessor.java
@@ -19,25 +19,26 @@
package org.apache.streampipes.processors.textmining.jvm.processor.namefinder;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import opennlp.tools.namefind.NameFinderME;
import opennlp.tools.namefind.TokenNameFinderModel;
@@ -48,7 +49,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
-public class NameFinderProcessor extends StreamPipesDataProcessor {
+public class NameFinderProcessor implements IStreamPipesDataProcessor {
private static final String MODEL = "model";
private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -58,43 +59,56 @@ public class NameFinderProcessor extends
StreamPipesDataProcessor {
private NameFinderME nameFinder;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
- .create("org.apache.streampipes.processors.textmining.jvm.namefinder",
0)
- .category(DataProcessorType.ENRICH_TEXT)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.listRequirement(Datatypes.String),
- Labels.withId(TOKENS_FIELD_KEY),
- PropertyScope.NONE)
- .build())
- .requiredFile(Labels.withId(MODEL))
- .outputStrategy(OutputStrategies.append(
- EpProperties.listStringEp(
- Labels.withId(FOUND_NAME_FIELD_KEY),
- FOUND_NAME_FIELD_KEY,
- "http://schema.org/ItemList")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ NameFinderProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.textmining.jvm.namefinder", 0)
+ .category(DataProcessorType.ENRICH_TEXT)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.listRequirement(Datatypes.String),
+ Labels.withId(TOKENS_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .requiredFile(Labels.withId(MODEL))
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.listStringEp(
+ Labels.withId(FOUND_NAME_FIELD_KEY),
+ FOUND_NAME_FIELD_KEY,
+ "http://schema.org/ItemList"
+ )))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- String filename = parameters.extractor().selectedFilename(MODEL);
- byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
- this.tokens =
parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) {
+ String filename = params.extractor()
+ .selectedFilename(MODEL);
+ byte[] fileContent = runtimeContext.getStreamPipesClient()
+ .fileApi()
+ .getFileContent(filename);
+ this.tokens = params.extractor()
+ .mappingPropertyValue(TOKENS_FIELD_KEY);
loadModel(fileContent);
}
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
+ ListField tokens = event.getFieldBySelector(this.tokens)
+ .getAsList();
- String[] tokensArray =
tokens.castItems(String.class).toArray(String[]::new);
+ String[] tokensArray = tokens.castItems(String.class)
+ .toArray(String[]::new);
Span[] spans = nameFinder.find(tokensArray);
// Generating the list of names from the found spans by the nameFinder
@@ -107,8 +121,7 @@ public class NameFinderProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
private void loadModel(byte[] modelContent) {
@@ -116,7 +129,7 @@ public class NameFinderProcessor extends
StreamPipesDataProcessor {
TokenNameFinderModel model = new TokenNameFinderModel(modelIn);
nameFinder = new NameFinderME(model);
} catch (IOException e) {
- e.printStackTrace();
+ throw new SpRuntimeException("Error when loading the uploaded model.",
e);
}
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
index 4f2eddab07..12c3513647 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/partofspeech/PartOfSpeechProcessor.java
@@ -19,24 +19,25 @@
package
org.apache.streampipes.processors.textmining.jvm.processor.partofspeech;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.ListField;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import opennlp.tools.postag.POSModel;
import opennlp.tools.postag.POSTaggerME;
@@ -45,7 +46,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-public class PartOfSpeechProcessor extends StreamPipesDataProcessor {
+public class PartOfSpeechProcessor implements IStreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
static final String CONFIDENCE_KEY = "confidencePos";
@@ -56,39 +57,52 @@ public class PartOfSpeechProcessor extends
StreamPipesDataProcessor {
private POSTaggerME posTagger;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.textmining.jvm.partofspeech", 0)
- .category(DataProcessorType.ENRICH_TEXT)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredFile(Labels.withId(BINARY_FILE_KEY))
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.listRequirement(Datatypes.String),
- Labels.withId(DETECTION_FIELD_KEY),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.listDoubleEp(
- Labels.withId(CONFIDENCE_KEY),
- CONFIDENCE_KEY,
- "http://schema.org/ItemList"),
- EpProperties.listStringEp(
- Labels.withId(TAG_KEY),
- TAG_KEY,
- "http://schema.org/ItemList")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ PartOfSpeechProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.textmining.jvm.partofspeech", 0)
+ .category(DataProcessorType.ENRICH_TEXT)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredFile(Labels.withId(BINARY_FILE_KEY))
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+
EpRequirements.listRequirement(Datatypes.String),
+ Labels.withId(DETECTION_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.listDoubleEp(
+ Labels.withId(CONFIDENCE_KEY),
+ CONFIDENCE_KEY,
+ "http://schema.org/ItemList"
+ ),
+ EpProperties.listStringEp(
+ Labels.withId(TAG_KEY),
+ TAG_KEY,
+ "http://schema.org/ItemList"
+ )
+ ))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
- this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) {
+ String filename = params.extractor()
+ .selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent = runtimeContext.getStreamPipesClient()
+ .fileApi()
+ .getFileContent(filename);
+ this.detection = params.extractor()
+ .mappingPropertyValue(DETECTION_FIELD_KEY);
InputStream modelIn = new ByteArrayInputStream(fileContent);
POSModel model;
@@ -103,9 +117,11 @@ public class PartOfSpeechProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- ListField text = event.getFieldBySelector(detection).getAsList();
+ ListField text = event.getFieldBySelector(detection)
+ .getAsList();
- String[] tags =
posTagger.tag(text.castItems(String.class).toArray(String[]::new));
+ String[] tags = posTagger.tag(text.castItems(String.class)
+ .toArray(String[]::new));
double[] confidence = posTagger.probs();
@@ -116,6 +132,6 @@ public class PartOfSpeechProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
index 83136155d3..0a5a470426 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/sentencedetection/SentenceDetectionProcessor.java
@@ -19,21 +19,22 @@
package
org.apache.streampipes.processors.textmining.jvm.processor.sentencedetection;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import opennlp.tools.sentdetect.SentenceDetectorME;
import opennlp.tools.sentdetect.SentenceModel;
@@ -42,7 +43,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-public class SentenceDetectionProcessor extends StreamPipesDataProcessor {
+public class SentenceDetectionProcessor implements IStreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
private static final String BINARY_FILE_KEY = "binary-file";
@@ -51,31 +52,41 @@ public class SentenceDetectionProcessor extends
StreamPipesDataProcessor {
private SentenceDetectorME sentenceDetector;
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
-
.create("org.apache.streampipes.processors.textmining.jvm.sentencedetection", 0)
- .category(DataProcessorType.ENRICH_TEXT)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredFile(Labels.withId(BINARY_FILE_KEY))
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.stringReq(),
- Labels.withId(DETECTION_FIELD_KEY),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.keep())
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SentenceDetectionProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.textmining.jvm.sentencedetection", 0)
+ .category(DataProcessorType.ENRICH_TEXT)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredFile(Labels.withId(BINARY_FILE_KEY))
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(DETECTION_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
- this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) {
+ String filename = params.extractor()
+ .selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent = runtimeContext.getStreamPipesClient()
+ .fileApi()
+ .getFileContent(filename);
+ this.detection = params.extractor()
+ .mappingPropertyValue(DETECTION_FIELD_KEY);
InputStream modelIn = new ByteArrayInputStream(fileContent);
SentenceModel model;
@@ -90,7 +101,9 @@ public class SentenceDetectionProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String text =
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+ String text = event.getFieldBySelector(detection)
+ .getAsPrimitive()
+ .getAsString();
String sentences[] = sentenceDetector.sentDetect(text);
@@ -101,7 +114,6 @@ public class SentenceDetectionProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}
diff --git
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
index a40cda4643..fab28b112f 100644
---
a/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
+++
b/streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/tokenizer/TokenizerProcessor.java
@@ -19,22 +19,23 @@
package org.apache.streampipes.processors.textmining.jvm.processor.tokenizer;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import opennlp.tools.tokenize.TokenizerME;
import opennlp.tools.tokenize.TokenizerModel;
@@ -43,7 +44,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-public class TokenizerProcessor extends StreamPipesDataProcessor {
+public class TokenizerProcessor implements IStreamPipesDataProcessor {
private static final String DETECTION_FIELD_KEY = "detectionField";
static final String TOKEN_LIST_FIELD_KEY = "tokenList";
@@ -52,35 +53,46 @@ public class TokenizerProcessor extends
StreamPipesDataProcessor {
private String detection;
private TokenizerME tokenizer;
-
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
- .create("org.apache.streampipes.processors.textmining.jvm.tokenizer",
0)
- .category(DataProcessorType.ENRICH_TEXT)
- .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
- .withLocales(Locales.EN)
- .requiredFile(Labels.withId(BINARY_FILE_KEY))
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.stringReq(),
- Labels.withId(DETECTION_FIELD_KEY),
- PropertyScope.NONE)
- .build())
-
.outputStrategy(OutputStrategies.append(EpProperties.listStringEp(Labels.withId(TOKEN_LIST_FIELD_KEY),
- TOKEN_LIST_FIELD_KEY,
- "http://schema.org/ItemList")))
- .build();
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TokenizerProcessor::new,
+ ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.textmining.jvm.tokenizer", 0)
+ .category(DataProcessorType.ENRICH_TEXT)
+ .withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredFile(Labels.withId(BINARY_FILE_KEY))
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(DETECTION_FIELD_KEY),
+ PropertyScope.NONE
+ )
+ .build())
+ .outputStrategy(OutputStrategies.append(EpProperties.listStringEp(
+ Labels.withId(TOKEN_LIST_FIELD_KEY),
+ TOKEN_LIST_FIELD_KEY,
+ "http://schema.org/ItemList"
+ )))
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
- String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
- byte[] fileContent =
runtimeContext.getStreamPipesClient().fileApi().getFileContent(filename);
- this.detection =
parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
+ public void onPipelineStarted(
+ IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext
+ ) {
+ String filename = params.extractor()
+ .selectedFilename(BINARY_FILE_KEY);
+ byte[] fileContent = runtimeContext.getStreamPipesClient()
+ .fileApi()
+ .getFileContent(filename);
+ this.detection = params.extractor()
+ .mappingPropertyValue(DETECTION_FIELD_KEY);
InputStream modelIn = new ByteArrayInputStream(fileContent);
TokenizerModel model;
@@ -95,7 +107,9 @@ public class TokenizerProcessor extends
StreamPipesDataProcessor {
@Override
public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- String text =
event.getFieldBySelector(detection).getAsPrimitive().getAsString();
+ String text = event.getFieldBySelector(detection)
+ .getAsPrimitive()
+ .getAsString();
event.addField(TokenizerProcessor.TOKEN_LIST_FIELD_KEY,
tokenizer.tokenize(text));
@@ -103,7 +117,6 @@ public class TokenizerProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() throws SpRuntimeException {
-
+ public void onPipelineStopped() {
}
}