This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new d7fa8f18d0 refactor(#3654): Update MultiModelPromptProcessor to
implement IStreamPipesDataProcessor interface (#3657)
d7fa8f18d0 is described below
commit d7fa8f18d0dfb3b294e1d4526ecd2260b1980624
Author: Philipp Zehnder <[email protected]>
AuthorDate: Wed Jun 11 08:20:18 2025 +0200
refactor(#3654): Update MultiModelPromptProcessor to implement
IStreamPipesDataProcessor interface (#3657)
---
.../multimodel/MultiModelPromptProcessor.java | 67 +++++++++++-----------
1 file changed, 34 insertions(+), 33 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
b/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
index 57f8dc03bf..4c8da88f7d 100644
---
a/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
+++
b/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
@@ -18,11 +18,13 @@
package org.apache.streampipes.processors.llm.jvm.processor.multimodel;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import
org.apache.streampipes.processors.llm.jvm.processor.multimodel.context.ChatContext;
@@ -32,6 +34,7 @@ import
org.apache.streampipes.processors.llm.jvm.processor.multimodel.context.Wi
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.CodeLanguage;
import org.apache.streampipes.sdk.helpers.EpProperties;
@@ -41,8 +44,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
@@ -69,7 +70,7 @@ import java.util.stream.Collectors;
* model’s answer as a new event field. History behaviour (stateless, N‑window,
* or full conversation) is configurable per pipeline instance.
*/
-public class MultiModelPromptProcessor extends StreamPipesDataProcessor {
+public class MultiModelPromptProcessor implements IStreamPipesDataProcessor {
// UI config IDs
public static final String MODEL_PROVIDER_ID = "modelProvider";
@@ -96,19 +97,11 @@ public class MultiModelPromptProcessor extends
StreamPipesDataProcessor {
private ChatContext chatContext;
private Set<String> inputFieldSelectors;
- /* Simple null / blank guard */
- private static void requireNonBlank(String value, String message) throws
SpRuntimeException {
- if (value == null || value.isBlank()) {
- throw new SpRuntimeException(message);
- }
- }
-
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(
- "org.apache.streampipes.processors.llm.jvm.multimodel",
- 0
- )
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ MultiModelPromptProcessor::new,
+
ProcessingElementBuilder.create("org.apache.streampipes.processors.llm.jvm.multimodel",
0)
.category(DataProcessorType.TRANSFORM)
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION,
ExtensionAssetType.ICON)
@@ -142,13 +135,14 @@ public class MultiModelPromptProcessor extends
StreamPipesDataProcessor {
// Output mapping
.outputStrategy(OutputStrategies.append(
EpProperties.stringEp(Labels.empty(), OUTPUT_FIELD_ID,
SO.TEXT)))
- .build();
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams params,
- SpOutputCollector collector,
- EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ public void onPipelineStarted(IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext)
throws SpRuntimeException {
var extractor = params.extractor();
String provider =
extractor.selectedAlternativeInternalId(MODEL_PROVIDER_ID);
@@ -214,7 +208,7 @@ public class MultiModelPromptProcessor extends
StreamPipesDataProcessor {
}
@Override
- public void onDetach() {
+ public void onPipelineStopped() {
LOG.info("MultiModelPromptProcessor detached – history cleared");
}
@@ -235,28 +229,35 @@ public class MultiModelPromptProcessor extends
StreamPipesDataProcessor {
case PROVIDER_OPENAI -> {
requireNonBlank(openApiKey, "API key is required for OpenAI");
yield OpenAiChatModel.builder()
- .apiKey(openApiKey)
- .modelName(modelName)
- .temperature(temperature)
- .build();
+ .apiKey(openApiKey)
+ .modelName(modelName)
+ .temperature(temperature)
+ .build();
}
case PROVIDER_ANTHROPIC -> {
requireNonBlank(anthropicKey, "API key is required for Anthropic");
yield AnthropicChatModel.builder()
- .apiKey(anthropicKey)
- .modelName(modelName)
- .temperature(temperature)
- .build();
+ .apiKey(anthropicKey)
+ .modelName(modelName)
+ .temperature(temperature)
+ .build();
}
case PROVIDER_OLLAMA -> {
requireNonBlank(ollamaUrl, "Base URL is required for Ollama");
yield OllamaChatModel.builder()
- .baseUrl(ollamaUrl)
- .modelName(modelName)
- .temperature(temperature)
- .build();
+ .baseUrl(ollamaUrl)
+ .modelName(modelName)
+ .temperature(temperature)
+ .build();
}
default -> throw new SpRuntimeException("Unknown model provider: " +
provider);
};
}
+
+ /* Simple null / blank guard */
+ private static void requireNonBlank(String value, String message) throws
SpRuntimeException {
+ if (value == null || value.isBlank()) {
+ throw new SpRuntimeException(message);
+ }
+ }
}