This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new bfabae9a28 refactor: Update QdrantSink to use IStreamPipesDataSink
instead of StreamPipesDataSink (#3650)
bfabae9a28 is described below
commit bfabae9a28469ed5c7a6f07a0c8aec6abdb1f647
Author: Anush <[email protected]>
AuthorDate: Wed Jun 11 11:49:37 2025 +0530
refactor: Update QdrantSink to use IStreamPipesDataSink instead of
StreamPipesDataSink (#3650)
Signed-off-by: Anush008 <[email protected]>
---
.../sinks/databases/jvm/qdrant/QdrantSink.java | 34 +++++++++++++---------
1 file changed, 20 insertions(+), 14 deletions(-)
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
index b8bb05d40c..6fea2bb1e1 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
@@ -20,7 +20,10 @@ package org.apache.streampipes.sinks.databases.jvm.qdrant;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataSinkDescription;
@@ -28,13 +31,12 @@ import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.vocabulary.XSD;
-import org.apache.streampipes.wrapper.params.compat.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import io.qdrant.client.PointIdFactory;
import io.qdrant.client.QdrantClient;
@@ -54,7 +56,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
-public class QdrantSink extends StreamPipesDataSink {
+public class QdrantSink implements IStreamPipesDataSink {
public static final String QDRANT_HOST_KEY = "qdrant_host";
public static final String QDRANT_PORT_KEY = "qdrant_port";
public static final String QDRANT_API_KEY_KEY = "qdrant_api_key";
@@ -90,7 +92,6 @@ public class QdrantSink extends StreamPipesDataSink {
}
};
- @Override
public DataSinkDescription declareModel() {
return
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.qdrant", 0)
.withLocales(Locales.EN)
@@ -116,9 +117,9 @@ public class QdrantSink extends StreamPipesDataSink {
}
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext)
- throws SpRuntimeException {
- var extractor = parameters.extractor();
+ public void onPipelineStarted(
+ IDataSinkParameters params, EventSinkRuntimeContext runtimeContext) {
+ var extractor = params.extractor();
final String host = validateAndExtractHost(extractor);
final Integer port = validateAndExtractPort(extractor);
@@ -142,6 +143,18 @@ public class QdrantSink extends StreamPipesDataSink {
}
}
+ @Override
+ public IDataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(QdrantSink::new, declareModel());
+ }
+
+ @Override
+ public void onPipelineStopped() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
private String validateAndExtractHost(IDataSinkParameterExtractor extractor)
throws SpRuntimeException {
String host = extractor.singleValueParameter(QDRANT_HOST_KEY,
String.class);
@@ -239,13 +252,6 @@ public class QdrantSink extends StreamPipesDataSink {
}
}
- @Override
- public void onDetach() {
- if (client != null) {
- client.close();
- }
- }
-
@Override
public void onEvent(Event event) {
if (event == null) {