This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new bfabae9a28 refactor: Update QdrantSink to use IStreamPipesDataSink 
instead of StreamPipesDataSink (#3650)
bfabae9a28 is described below

commit bfabae9a28469ed5c7a6f07a0c8aec6abdb1f647
Author: Anush <[email protected]>
AuthorDate: Wed Jun 11 11:49:37 2025 +0530

    refactor: Update QdrantSink to use IStreamPipesDataSink instead of 
StreamPipesDataSink (#3650)
    
    Signed-off-by: Anush008 <[email protected]>
---
 .../sinks/databases/jvm/qdrant/QdrantSink.java     | 34 +++++++++++++---------
 1 file changed, 20 insertions(+), 14 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
index b8bb05d40c..6fea2bb1e1 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
@@ -20,7 +20,10 @@ package org.apache.streampipes.sinks.databases.jvm.qdrant;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
 import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
 import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -28,13 +31,12 @@ import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
 import io.qdrant.client.PointIdFactory;
 import io.qdrant.client.QdrantClient;
@@ -54,7 +56,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
-public class QdrantSink extends StreamPipesDataSink {
+public class QdrantSink implements IStreamPipesDataSink {
   public static final String QDRANT_HOST_KEY = "qdrant_host";
   public static final String QDRANT_PORT_KEY = "qdrant_port";
   public static final String QDRANT_API_KEY_KEY = "qdrant_api_key";
@@ -90,7 +92,6 @@ public class QdrantSink extends StreamPipesDataSink {
         }
       };
 
-  @Override
   public DataSinkDescription declareModel() {
     return 
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.qdrant", 0)
         .withLocales(Locales.EN)
@@ -116,9 +117,9 @@ public class QdrantSink extends StreamPipesDataSink {
   }
 
   @Override
-  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext)
-      throws SpRuntimeException {
-    var extractor = parameters.extractor();
+  public void onPipelineStarted(
+      IDataSinkParameters params, EventSinkRuntimeContext runtimeContext) {
+    var extractor = params.extractor();
 
     final String host = validateAndExtractHost(extractor);
     final Integer port = validateAndExtractPort(extractor);
@@ -142,6 +143,18 @@ public class QdrantSink extends StreamPipesDataSink {
     }
   }
 
+  @Override
+  public IDataSinkConfiguration declareConfig() {
+    return DataSinkConfiguration.create(QdrantSink::new, declareModel());
+  }
+
+  @Override
+  public void onPipelineStopped() {
+    if (client != null) {
+      client.close();
+    }
+  }
+
   private String validateAndExtractHost(IDataSinkParameterExtractor extractor)
       throws SpRuntimeException {
     String host = extractor.singleValueParameter(QDRANT_HOST_KEY, 
String.class);
@@ -239,13 +252,6 @@ public class QdrantSink extends StreamPipesDataSink {
     }
   }
 
-  @Override
-  public void onDetach() {
-    if (client != null) {
-      client.close();
-    }
-  }
-
   @Override
   public void onEvent(Event event) {
     if (event == null) {

Reply via email to