This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new ac4ea4dd9b feat: Qdrant Sink (#3648)
ac4ea4dd9b is described below

commit ac4ea4dd9b314f586e7f618b15c093b00081957e
Author: Anush <[email protected]>
AuthorDate: Tue Jun 10 13:01:21 2025 +0530

    feat: Qdrant Sink (#3648)
    
    * feat: Qdrant Sink
    
    Signed-off-by: Anush008 <[email protected]>
    
    * chore: Updated deps
    
    Signed-off-by: Anush008 <[email protected]>
    
    * chore: Formatting
    
    Signed-off-by: Anush008 <[email protected]>
    
    ---------
    
    Signed-off-by: Anush008 <[email protected]>
---
 pom.xml                                            |  27 +-
 .../streampipes-sinks-databases-jvm/pom.xml        |   4 +
 .../jvm/DatabaseSinksExtensionModuleExport.java    |   5 +-
 .../sinks/databases/jvm/qdrant/QdrantSink.java     | 325 +++++++++++++++++++++
 .../databases/jvm/qdrant/QdrantValueFactory.java   | 112 +++++++
 .../documentation.md                               |  88 ++++++
 .../icon.png                                       | Bin 0 -> 3443 bytes
 .../strings.en                                     |  43 +++
 8 files changed, 599 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 8565eb2524..10c6fd5999 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,7 +67,7 @@
         <graalvm.js.version>23.0.0</graalvm.js.version>
         <gson.version>2.12.1</gson.version>
         <guava.version>33.4.0-jre</guava.version>
-        <grpc-context.version>1.59.1</grpc-context.version>
+        <grpc.version>1.59.1</grpc.version>
         <httpclient.version>4.5.13</httpclient.version>
         <httpcore.version>4.4.9</httpcore.version>
         <hadoop.version>3.4.1</hadoop.version>
@@ -104,7 +104,7 @@
         <micrometer-prometheus.version>1.14.3</micrometer-prometheus.version>
         <micrometer-observation.version>1.14.3</micrometer-observation.version>
         <mqtt-client.version>1.12</mqtt-client.version>
-        <milvus-sdk-java.version>2.5.5</milvus-sdk-java.version>
+        <milvus-sdk-java.version>2.5.10</milvus-sdk-java.version>
         <nats.version>2.19.1</nats.version>
         <netty.version>4.2.0.Final</netty.version>
         <okhttp.version>3.13.1</okhttp.version>
@@ -139,6 +139,7 @@
         <hawtbuf.version>1.11</hawtbuf.version>
         <woodstox.version>7.0.0</woodstox.version>
         <xz.version>1.10</xz.version>
+        <qdrant-java-client.version>1.14.0</qdrant-java-client.version>
 
         <!-- Test dependencies -->
         <junit.version>5.11.4</junit.version>
@@ -286,6 +287,11 @@
                 <artifactId>milvus-sdk-java</artifactId>
                 <version>${milvus-sdk-java.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.qdrant</groupId>
+                <artifactId>client</artifactId>
+                <version>${qdrant-java-client.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.tsfile</groupId>
                 <artifactId>tsfile</artifactId>
@@ -324,7 +330,22 @@
             <dependency>
                 <groupId>io.grpc</groupId>
                 <artifactId>grpc-context</artifactId>
-                <version>${grpc-context.version}</version>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-stub</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-protobuf</artifactId>
+                <version>${grpc.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.grpc</groupId>
+                <artifactId>grpc-netty-shaded</artifactId>
+                <version>${grpc.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.nats</groupId>
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml 
b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
index b5216cdf91..557c0de036 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
@@ -98,6 +98,10 @@
             <groupId>io.milvus</groupId>
             <artifactId>milvus-sdk-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.qdrant</groupId>
+            <artifactId>client</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.errorprone</groupId>
             <artifactId>error_prone_annotations</artifactId>
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
index cb33173b85..5285759baa 100644
--- 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
@@ -28,6 +28,7 @@ import 
org.apache.streampipes.sinks.databases.jvm.iotdb.IotDbSink;
 import org.apache.streampipes.sinks.databases.jvm.milvus.MilvusSink;
 import org.apache.streampipes.sinks.databases.jvm.parquet.ParquetSink;
 import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlSink;
+import org.apache.streampipes.sinks.databases.jvm.qdrant.QdrantSink;
 import org.apache.streampipes.sinks.databases.jvm.redis.RedisSink;
 import org.apache.streampipes.sinks.databases.jvm.tsfile.TsFileSink;
 
@@ -49,9 +50,9 @@ public class DatabaseSinksExtensionModuleExport implements 
IExtensionModuleExpor
         new DittoSink(),
         new RedisSink(),
         new MilvusSink(),
+        new QdrantSink(),
         new TsFileSink(),
-        new ParquetSink()
-    );
+        new ParquetSink());
   }
 
   @Override
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
new file mode 100644
index 0000000000..b8bb05d40c
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.qdrant;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import 
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import 
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.vocabulary.XSD;
+import org.apache.streampipes.wrapper.params.compat.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import io.qdrant.client.PointIdFactory;
+import io.qdrant.client.QdrantClient;
+import io.qdrant.client.QdrantGrpcClient;
+import io.qdrant.client.VectorFactory;
+import io.qdrant.client.grpc.Collections.Distance;
+import io.qdrant.client.grpc.Collections.VectorParams;
+import io.qdrant.client.grpc.JsonWithInt.Value;
+import io.qdrant.client.grpc.Points.NamedVectors;
+import io.qdrant.client.grpc.Points.PointStruct;
+import io.qdrant.client.grpc.Points.Vectors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class QdrantSink extends StreamPipesDataSink {
+  public static final String QDRANT_HOST_KEY = "qdrant_host";
+  public static final String QDRANT_PORT_KEY = "qdrant_port";
+  public static final String QDRANT_API_KEY_KEY = "qdrant_api_key";
+  public static final String COLLECTION_NAME_KEY = "qdrant_collection_name";
+  public static final String VECTOR_NAME_KEY = "qdrant_vector_field";
+  public static final String VECTOR_DIMENSION_KEY = "qdrant_vector_dimension";
+  public static final String VECTOR_DISTANCE_KEY = "qdrant_distance_metric";
+  public static final String ID_KEY = "qdrant_id";
+
+  private QdrantClient client;
+  private String vector;
+  private String id;
+  private String collectionName;
+  private Integer dimension;
+  private Distance distanceType;
+
+  public static final String BYTE = XSD.BYTE.toString();
+  public static final String SHORT = XSD.SHORT.toString();
+  public static final String LONG = XSD.LONG.toString();
+  public static final String INT = XSD.INT.toString();
+  public static final String FLOAT = XSD.FLOAT.toString();
+  public static final String DOUBLE = XSD.DOUBLE.toString();
+  public static final String BOOLEAN = XSD.BOOLEAN.toString();
+  public static final String STRING = XSD.STRING.toString();
+
+  private static final Map<String, Distance> DISTANCE_TYPE_MAP =
+      new HashMap<>() {
+        {
+          put("Cosine", Distance.Cosine);
+          put("Euclid", Distance.Euclid);
+          put("Dot", Distance.Dot);
+          put("Manhattan", Distance.Manhattan);
+        }
+      };
+
+  @Override
+  public DataSinkDescription declareModel() {
+    return 
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.qdrant", 0)
+        .withLocales(Locales.EN)
+        .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+        .category(DataSinkType.DATABASE)
+        .requiredTextParameter(Labels.withId(QDRANT_HOST_KEY), "localhost")
+        .requiredIntegerParameter(Labels.withId(QDRANT_PORT_KEY), 6334)
+        .requiredTextParameter(Labels.withId(QDRANT_API_KEY_KEY), 
"<optional-api-key>")
+        .requiredTextParameter(Labels.withId(COLLECTION_NAME_KEY))
+        .requiredTextParameter(Labels.withId(ID_KEY))
+        .requiredIntegerParameter(Labels.withId(VECTOR_DIMENSION_KEY))
+        .requiredStream(
+            StreamRequirementsBuilder.create()
+                .requiredPropertyWithUnaryMapping(
+                    EpRequirements.anyProperty(),
+                    Labels.withId(VECTOR_NAME_KEY),
+                    PropertyScope.NONE)
+                .build())
+        .requiredSingleValueSelection(
+            Labels.withId(VECTOR_DISTANCE_KEY),
+            Options.from(DISTANCE_TYPE_MAP.keySet().toArray(new String[0])))
+        .build();
+  }
+
+  @Override
+  public void onInvocation(SinkParams parameters, EventSinkRuntimeContext 
runtimeContext)
+      throws SpRuntimeException {
+    var extractor = parameters.extractor();
+
+    final String host = validateAndExtractHost(extractor);
+    final Integer port = validateAndExtractPort(extractor);
+    final String apiKey = validateAndExtractApiKey(extractor);
+    this.collectionName = validateAndExtractCollectionName(extractor);
+    this.id = validateAndExtractId(extractor);
+    this.vector = validateAndExtractVectorField(extractor);
+    this.dimension = validateAndExtractDimension(extractor);
+    this.distanceType = validateAndExtractDistanceType(extractor);
+
+    try {
+      client = new QdrantClient(QdrantGrpcClient.newBuilder(host, 
port).withApiKey(apiKey).build());
+
+      createOrValidateCollection();
+
+    } catch (Exception e) {
+      if (client != null) {
+        client.close();
+      }
+      throw new SpRuntimeException("Failed to initialize Qdrant connection: " 
+ e.getMessage());
+    }
+  }
+
+  private String validateAndExtractHost(IDataSinkParameterExtractor extractor)
+      throws SpRuntimeException {
+    String host = extractor.singleValueParameter(QDRANT_HOST_KEY, 
String.class);
+    if (host == null || host.trim().isEmpty()) {
+      throw new SpRuntimeException("Host cannot be empty");
+    }
+    return host;
+  }
+
+  private Integer validateAndExtractPort(IDataSinkParameterExtractor extractor)
+      throws SpRuntimeException {
+    Integer port = extractor.singleValueParameter(QDRANT_PORT_KEY, 
Integer.class);
+    if (port == null || port < 1 || port > 65535) {
+      throw new SpRuntimeException("Port must be between 1 and 65535");
+    }
+    return port;
+  }
+
+  private String validateAndExtractApiKey(IDataSinkParameterExtractor 
extractor)
+      throws SpRuntimeException {
+    String apiKey = extractor.singleValueParameter(QDRANT_API_KEY_KEY, 
String.class);
+    if (apiKey == null || apiKey.trim().isEmpty()) {
+      throw new SpRuntimeException("API key cannot be empty");
+    }
+    return apiKey;
+  }
+
+  private String validateAndExtractCollectionName(IDataSinkParameterExtractor 
extractor)
+      throws SpRuntimeException {
+    String collectionName = 
extractor.singleValueParameter(COLLECTION_NAME_KEY, String.class);
+    if (collectionName == null || collectionName.trim().isEmpty()) {
+      throw new SpRuntimeException("Collection name cannot be empty");
+    }
+    return collectionName;
+  }
+
+  private String validateAndExtractId(IDataSinkParameterExtractor extractor)
+      throws SpRuntimeException {
+    String id = extractor.singleValueParameter(ID_KEY, String.class);
+    if (id == null || id.trim().isEmpty()) {
+      throw new SpRuntimeException("ID field cannot be empty");
+    }
+    try {
+      UUID.fromString(id);
+    } catch (IllegalArgumentException e) {
+      throw new SpRuntimeException("Invalid ID format. The ID must be a valid 
UUID string.");
+    }
+    return id;
+  }
+
+  private String validateAndExtractVectorField(IDataSinkParameterExtractor 
extractor)
+      throws SpRuntimeException {
+    String vectorField = extractor.mappingPropertyValue(VECTOR_NAME_KEY);
+    if (vectorField == null || vectorField.trim().isEmpty()) {
+      throw new SpRuntimeException("Vector field cannot be empty");
+    }
+    return vectorField.substring(4);
+  }
+
+  private Integer validateAndExtractDimension(IDataSinkParameterExtractor 
extractor)
+      throws SpRuntimeException {
+    Integer dimension =
+        Integer.valueOf(extractor.singleValueParameter(VECTOR_DIMENSION_KEY, 
String.class));
+    if (dimension == null || dimension <= 0) {
+      throw new SpRuntimeException("Vector dimension must be a positive 
number");
+    }
+    return dimension;
+  }
+
+  private Distance validateAndExtractDistanceType(IDataSinkParameterExtractor 
extractor)
+      throws SpRuntimeException {
+    String distanceTypeStr = 
extractor.selectedSingleValue(VECTOR_DISTANCE_KEY, String.class);
+    Distance distanceType = DISTANCE_TYPE_MAP.get(distanceTypeStr);
+    if (distanceType == null) {
+      throw new SpRuntimeException("Invalid distance type: " + 
distanceTypeStr);
+    }
+    return distanceType;
+  }
+
+  private void createOrValidateCollection() throws SpRuntimeException {
+    try {
+      var collectionExists = 
client.collectionExistsAsync(collectionName).get();
+      if (!collectionExists) {
+
+        client
+            .createCollectionAsync(
+                collectionName,
+                Map.of(
+                    vector,
+                    
VectorParams.newBuilder().setSize(dimension).setDistance(distanceType).build()))
+            .get();
+      }
+    } catch (Exception e) {
+      throw new SpRuntimeException("Failed to create or validate collection: " 
+ e.getMessage());
+    }
+  }
+
+  @Override
+  public void onDetach() {
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  @Override
+  public void onEvent(Event event) {
+    if (event == null) {
+      return;
+    }
+
+    final Map<String, Object> measurementValuePairs = event.getRaw();
+    if (measurementValuePairs.size() <= 1) {
+      return;
+    }
+
+    try {
+      Map<String, Value> payload = new HashMap<>();
+      List<Float> vectorValues = null;
+
+      for (Map.Entry<String, Object> entry : measurementValuePairs.entrySet()) 
{
+        final String name = entry.getKey();
+        final Object value = entry.getValue();
+
+        if (name.equals(vector)) {
+          vectorValues = validateAndExtractVectorValues(value);
+        } else if (value != null) {
+          payload.put(name, QdrantValueFactory.value(value));
+        }
+      }
+
+      if (vectorValues != null) {
+        if (vectorValues.size() != dimension) {
+          throw new SpRuntimeException(
+              String.format(
+                  "Vector dimension mismatch. Expected %d but got %d",
+                  dimension, vectorValues.size()));
+        }
+
+        PointStruct point =
+            PointStruct.newBuilder()
+                .setId(PointIdFactory.id(UUID.fromString(id)))
+                .setVectors(
+                    Vectors.newBuilder()
+                        .setVectors(
+                            NamedVectors.newBuilder()
+                                .putAllVectors(Map.of(vector, 
VectorFactory.vector(vectorValues)))
+                                .build())
+                        .build())
+                .putAllPayload(payload)
+                .build();
+
+        client.upsertAsync(collectionName, 
Collections.singletonList(point)).get();
+      } else {
+        throw new SpRuntimeException("No vector values found in the event");
+      }
+    } catch (Exception e) {
+      throw new SpRuntimeException("Error processing event: " + 
e.getMessage());
+    }
+  }
+
+  private List<Float> validateAndExtractVectorValues(Object value) throws 
SpRuntimeException {
+    if (!(value instanceof List)) {
+      throw new SpRuntimeException("Vector field must be a list of numbers");
+    }
+
+    List<?> list = (List<?>) value;
+    try {
+      return list.stream()
+          .map(
+              item -> {
+                if (item instanceof Number) {
+                  return ((Number) item).floatValue();
+                }
+                throw new IllegalArgumentException("Vector must contain only 
numbers");
+              })
+          .collect(Collectors.toList());
+    } catch (IllegalArgumentException e) {
+      throw new SpRuntimeException("Invalid vector values: " + e.getMessage());
+    }
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantValueFactory.java
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantValueFactory.java
new file mode 100644
index 0000000000..5557e1e1c3
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantValueFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.qdrant;
+
+import io.qdrant.client.ValueFactory;
+import io.qdrant.client.grpc.JsonWithInt.Struct;
+import io.qdrant.client.grpc.JsonWithInt.Value;
+import org.springframework.util.Assert;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utility methods for building io.qdrant.client.grpc.JsonWithInt.Value from 
Java objects. */
+final class QdrantValueFactory {
+
+  private QdrantValueFactory() {}
+
+  @SuppressWarnings("unchecked")
+  public static Value value(Object value) {
+
+    if (value == null) {
+      return ValueFactory.nullValue();
+    }
+
+    if (value.getClass().isArray()) {
+      int length = Array.getLength(value);
+      Object[] objectArray = new Object[length];
+      for (int i = 0; i < length; i++) {
+        objectArray[i] = Array.get(value, i);
+      }
+      return value(objectArray);
+    }
+
+    if (value instanceof Map) {
+      return value((Map<String, Object>) value);
+    }
+
+    if (value instanceof List) {
+      return value((List<Object>) value);
+    }
+
+    switch (value.getClass().getSimpleName()) {
+      case "String":
+        return ValueFactory.value((String) value);
+      case "Integer":
+        return ValueFactory.value((Integer) value);
+      case "Long":
+        return ValueFactory.value(String.valueOf(value));
+      case "Double":
+        return ValueFactory.value((Double) value);
+      case "Float":
+        return ValueFactory.value((Float) value);
+      case "Boolean":
+        return ValueFactory.value((Boolean) value);
+      default:
+        throw new IllegalArgumentException("Unsupported Qdrant value type: " + 
value.getClass());
+    }
+  }
+
+  private static Value value(List<Object> elements) {
+    List<Value> values = new ArrayList<Value>(elements.size());
+
+    for (Object element : elements) {
+      values.add(value(element));
+    }
+
+    return ValueFactory.list(values);
+  }
+
+  private static Value value(Object[] elements) {
+    List<Value> values = new ArrayList<Value>(elements.length);
+
+    for (Object element : elements) {
+      values.add(value(element));
+    }
+
+    return ValueFactory.list(values);
+  }
+
+  private static Value value(Map<String, Object> inputMap) {
+    Struct.Builder structBuilder = Struct.newBuilder();
+    Map<String, Value> map = toValueMap(inputMap);
+    structBuilder.putAllFields(map);
+    return Value.newBuilder().setStructValue(structBuilder).build();
+  }
+
+  private static Map<String, Value> toValueMap(Map<String, Object> inputMap) {
+    Assert.notNull(inputMap, "Input map must not be null");
+
+    return inputMap.entrySet().stream()
+        .collect(Collectors.toMap(e -> e.getKey(), e -> value(e.getValue())));
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/documentation.md
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/documentation.md
new file mode 100644
index 0000000000..974d959ce2
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/documentation.md
@@ -0,0 +1,88 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## Qdrant
+
+<p align="center">
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Stores events in a Qdrant vector database. This sink connector allows you to 
store vector data along with associated metadata in a Qdrant collection.
+
+***
+
+## Required input
+
+This sink requires an input stream that contains a vector field. The vector 
field should be a list of float values.
+
+***
+
+## Configuration
+
+### Host
+
+The host address of the Qdrant instance (e.g., "localhost" or 
"xyz-example.cloud-region.cloud-provider.cloud.qdrant.io").
+
+### Port
+
+The port number of the Qdrant instance (default is 6334 for gRPC).
+
+### API Key
+
+The API key for authentication with Qdrant. This is required for secure access 
to the Qdrant instance.
+
+### Collection Name
+
+The name of the collection where the data will be stored. If the collection 
doesn't exist, it will be created automatically.
+
+### ID Field
+
+The field name that will be used as the unique identifier for each point in 
the collection. This should be a UUID string.
+
+### Vector Field
+
+The name of the field containing the vector data. This field should contain a 
list of float values.
+
+### Vector Dimension
+
+The dimension of the vectors to be stored (default is 384). This must match 
the dimension of your input vectors.
+
+### Distance Metric
+
+The distance metric to use for vector similarity search. Available options are:
+
+- Cosine
+- Euclid
+- Dot
+- Manhattan
+
+## Output
+
+(not applicable for data sinks)
+
+## Notes
+
+- The sink automatically creates the collection if it doesn't exist
+- All non-vector fields from the input event are stored as payload
+- The sink uses [gRPC for 
communication](https://qdrant.tech/documentation/interfaces/#grpc-interface) 
with Qdrant
+- Vector data must be provided as a list of float values
+- The ID fmust be a valid UUID string
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/icon.png
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/icon.png
new file mode 100644
index 0000000000..a66b10cba2
Binary files /dev/null and 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/icon.png
 differ
diff --git 
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/strings.en
 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/strings.en
new file mode 100644
index 0000000000..e63878964d
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/strings.en
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampipes.sinks.databases.jvm.qdrant.title=Qdrant
+org.apache.streampipes.sinks.databases.jvm.qdrant.description=Stores events in 
a Qdrant vector database. This sink connector allows you to store vector data 
along with associated metadata in a Qdrant collection.
+
+qdrant_host.title=Host
+qdrant_host.description=The host address of the Qdrant instance
+
+qdrant_port.title=Port
+qdrant_port.description=The port number of the Qdrant instance (default is 
6334 for gRPC)
+
+qdrant_api_key.title=API Key
+qdrant_api_key.description=The API key for authentication with Qdrant. This is 
required for secure access to the Qdrant instance.
+
+qdrant_collection_name.title=Collection Name
+qdrant_collection_name.description=The name of the collection where the data 
will be stored. If the collection doesn't exist, it will be created 
automatically.
+
+qdrant_id.title=ID Field
+qdrant_id.description=The field name that will be used as the unique 
identifier for each point in the collection. This should be a UUID string.
+
+qdrant_vector_field.title=Vector Field
+qdrant_vector_field.description=The name of the field containing the vector 
data. This field should contain a list of float values.
+
+qdrant_vector_dimension.title=Vector Dimension
+qdrant_vector_dimension.description=The dimension of the vectors to be stored. 
This must match the dimension of your input vectors.
+
+qdrant_distance_metric.title=Distance Metric
+qdrant_distance_metric.description=The distance metric to use for vector 
similarity search. Available options are: Cosine, Euclid, Dot, and Manhattan.

Reply via email to