This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 3654-update-processors-in-processors-llm-jvm
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3654-update-processors-in-processors-llm-jvm by this push:
     new a160ba84bb refactor(#3654): Update MultiModelPromptProcessor to 
implement IStreamPipesDataProcessor interface
a160ba84bb is described below

commit a160ba84bb0636b211b2a30fb450c0514bae7c09
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Jun 10 11:32:43 2025 +0200

    refactor(#3654): Update MultiModelPromptProcessor to implement 
IStreamPipesDataProcessor interface
---
 .../multimodel/MultiModelPromptProcessor.java      | 67 +++++++++++-----------
 1 file changed, 34 insertions(+), 33 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
 
b/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
index 57f8dc03bf..4c8da88f7d 100644
--- 
a/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-llm-jvm/src/main/java/org/apache/streampipes/processors/llm/jvm/processor/multimodel/MultiModelPromptProcessor.java
@@ -18,11 +18,13 @@
 package org.apache.streampipes.processors.llm.jvm.processor.multimodel;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import 
org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
 import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import 
org.apache.streampipes.processors.llm.jvm.processor.multimodel.context.ChatContext;
@@ -32,6 +34,7 @@ import 
org.apache.streampipes.processors.llm.jvm.processor.multimodel.context.Wi
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
 import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.CodeLanguage;
 import org.apache.streampipes.sdk.helpers.EpProperties;
@@ -41,8 +44,6 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import dev.langchain4j.data.message.AiMessage;
 import dev.langchain4j.data.message.ChatMessage;
@@ -69,7 +70,7 @@ import java.util.stream.Collectors;
  * model’s answer as a new event field. History behaviour (stateless, N‑window,
  * or full conversation) is configurable per pipeline instance.
  */
-public class MultiModelPromptProcessor extends StreamPipesDataProcessor {
+public class MultiModelPromptProcessor implements IStreamPipesDataProcessor {
 
   // UI config IDs
   public static final String MODEL_PROVIDER_ID = "modelProvider";
@@ -96,19 +97,11 @@ public class MultiModelPromptProcessor extends 
StreamPipesDataProcessor {
   private ChatContext chatContext;
   private Set<String> inputFieldSelectors;
 
-  /* Simple null / blank guard */
-  private static void requireNonBlank(String value, String message) throws 
SpRuntimeException {
-    if (value == null || value.isBlank()) {
-      throw new SpRuntimeException(message);
-    }
-  }
-
   @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create(
-                    "org.apache.streampipes.processors.llm.jvm.multimodel",
-                    0
-            )
+  public IDataProcessorConfiguration declareConfig() {
+    return DataProcessorConfiguration.create(
+        MultiModelPromptProcessor::new,
+        
ProcessingElementBuilder.create("org.apache.streampipes.processors.llm.jvm.multimodel",
 0)
             .category(DataProcessorType.TRANSFORM)
             .withLocales(Locales.EN)
             .withAssets(ExtensionAssetType.DOCUMENTATION, 
ExtensionAssetType.ICON)
@@ -142,13 +135,14 @@ public class MultiModelPromptProcessor extends 
StreamPipesDataProcessor {
             // Output mapping
             .outputStrategy(OutputStrategies.append(
                     EpProperties.stringEp(Labels.empty(), OUTPUT_FIELD_ID, 
SO.TEXT)))
-            .build();
+            .build()
+    );
   }
 
   @Override
-  public void onInvocation(ProcessorParams params,
-                           SpOutputCollector collector,
-                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+  public void onPipelineStarted(IDataProcessorParameters params,
+                                SpOutputCollector collector,
+                                EventProcessorRuntimeContext runtimeContext) 
throws SpRuntimeException {
 
     var extractor = params.extractor();
     String provider = 
extractor.selectedAlternativeInternalId(MODEL_PROVIDER_ID);
@@ -214,7 +208,7 @@ public class MultiModelPromptProcessor extends 
StreamPipesDataProcessor {
   }
 
   @Override
-  public void onDetach() {
+  public void onPipelineStopped() {
     LOG.info("MultiModelPromptProcessor detached – history cleared");
   }
 
@@ -235,28 +229,35 @@ public class MultiModelPromptProcessor extends 
StreamPipesDataProcessor {
       case PROVIDER_OPENAI -> {
         requireNonBlank(openApiKey, "API key is required for OpenAI");
         yield OpenAiChatModel.builder()
-                .apiKey(openApiKey)
-                .modelName(modelName)
-                .temperature(temperature)
-                .build();
+                             .apiKey(openApiKey)
+                             .modelName(modelName)
+                             .temperature(temperature)
+                             .build();
       }
       case PROVIDER_ANTHROPIC -> {
         requireNonBlank(anthropicKey, "API key is required for Anthropic");
         yield AnthropicChatModel.builder()
-                .apiKey(anthropicKey)
-                .modelName(modelName)
-                .temperature(temperature)
-                .build();
+                                .apiKey(anthropicKey)
+                                .modelName(modelName)
+                                .temperature(temperature)
+                                .build();
       }
       case PROVIDER_OLLAMA -> {
         requireNonBlank(ollamaUrl, "Base URL is required for Ollama");
         yield OllamaChatModel.builder()
-                .baseUrl(ollamaUrl)
-                .modelName(modelName)
-                .temperature(temperature)
-                .build();
+                             .baseUrl(ollamaUrl)
+                             .modelName(modelName)
+                             .temperature(temperature)
+                             .build();
       }
       default -> throw new SpRuntimeException("Unknown model provider: " + 
provider);
     };
   }
+
+  /* Simple null / blank guard */
+  private static void requireNonBlank(String value, String message) throws 
SpRuntimeException {
+    if (value == null || value.isBlank()) {
+      throw new SpRuntimeException(message);
+    }
+  }
 }

Reply via email to