This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new ac4ea4dd9b feat: Qdrant Sink (#3648)
ac4ea4dd9b is described below
commit ac4ea4dd9b314f586e7f618b15c093b00081957e
Author: Anush <[email protected]>
AuthorDate: Tue Jun 10 13:01:21 2025 +0530
feat: Qdrant Sink (#3648)
* feat: Qdrant Sink
Signed-off-by: Anush008 <[email protected]>
* chore: Updated deps
Signed-off-by: Anush008 <[email protected]>
* chore: Formatting
Signed-off-by: Anush008 <[email protected]>
---------
Signed-off-by: Anush008 <[email protected]>
---
pom.xml | 27 +-
.../streampipes-sinks-databases-jvm/pom.xml | 4 +
.../jvm/DatabaseSinksExtensionModuleExport.java | 5 +-
.../sinks/databases/jvm/qdrant/QdrantSink.java | 325 +++++++++++++++++++++
.../databases/jvm/qdrant/QdrantValueFactory.java | 112 +++++++
.../documentation.md | 88 ++++++
.../icon.png | Bin 0 -> 3443 bytes
.../strings.en | 43 +++
8 files changed, 599 insertions(+), 5 deletions(-)
diff --git a/pom.xml b/pom.xml
index 8565eb2524..10c6fd5999 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,7 +67,7 @@
<graalvm.js.version>23.0.0</graalvm.js.version>
<gson.version>2.12.1</gson.version>
<guava.version>33.4.0-jre</guava.version>
- <grpc-context.version>1.59.1</grpc-context.version>
+ <grpc.version>1.59.1</grpc.version>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.9</httpcore.version>
<hadoop.version>3.4.1</hadoop.version>
@@ -104,7 +104,7 @@
<micrometer-prometheus.version>1.14.3</micrometer-prometheus.version>
<micrometer-observation.version>1.14.3</micrometer-observation.version>
<mqtt-client.version>1.12</mqtt-client.version>
- <milvus-sdk-java.version>2.5.5</milvus-sdk-java.version>
+ <milvus-sdk-java.version>2.5.10</milvus-sdk-java.version>
<nats.version>2.19.1</nats.version>
<netty.version>4.2.0.Final</netty.version>
<okhttp.version>3.13.1</okhttp.version>
@@ -139,6 +139,7 @@
<hawtbuf.version>1.11</hawtbuf.version>
<woodstox.version>7.0.0</woodstox.version>
<xz.version>1.10</xz.version>
+ <qdrant-java-client.version>1.14.0</qdrant-java-client.version>
<!-- Test dependencies -->
<junit.version>5.11.4</junit.version>
@@ -286,6 +287,11 @@
<artifactId>milvus-sdk-java</artifactId>
<version>${milvus-sdk-java.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.qdrant</groupId>
+ <artifactId>client</artifactId>
+ <version>${qdrant-java-client.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.tsfile</groupId>
<artifactId>tsfile</artifactId>
@@ -324,7 +330,22 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
- <version>${grpc-context.version}</version>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.nats</groupId>
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
index b5216cdf91..557c0de036 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
@@ -98,6 +98,10 @@
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.qdrant</groupId>
+ <artifactId>client</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
index cb33173b85..5285759baa 100644
---
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/DatabaseSinksExtensionModuleExport.java
@@ -28,6 +28,7 @@ import
org.apache.streampipes.sinks.databases.jvm.iotdb.IotDbSink;
import org.apache.streampipes.sinks.databases.jvm.milvus.MilvusSink;
import org.apache.streampipes.sinks.databases.jvm.parquet.ParquetSink;
import org.apache.streampipes.sinks.databases.jvm.postgresql.PostgreSqlSink;
+import org.apache.streampipes.sinks.databases.jvm.qdrant.QdrantSink;
import org.apache.streampipes.sinks.databases.jvm.redis.RedisSink;
import org.apache.streampipes.sinks.databases.jvm.tsfile.TsFileSink;
@@ -49,9 +50,9 @@ public class DatabaseSinksExtensionModuleExport implements
IExtensionModuleExpor
new DittoSink(),
new RedisSink(),
new MilvusSink(),
+ new QdrantSink(),
new TsFileSink(),
- new ParquetSink()
- );
+ new ParquetSink());
}
@Override
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
new file mode 100644
index 0000000000..b8bb05d40c
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantSink.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.qdrant;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import
org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor;
+import
org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
+import org.apache.streampipes.model.graph.DataSinkDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.DataSinkBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.vocabulary.XSD;
+import org.apache.streampipes.wrapper.params.compat.SinkParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
+
+import io.qdrant.client.PointIdFactory;
+import io.qdrant.client.QdrantClient;
+import io.qdrant.client.QdrantGrpcClient;
+import io.qdrant.client.VectorFactory;
+import io.qdrant.client.grpc.Collections.Distance;
+import io.qdrant.client.grpc.Collections.VectorParams;
+import io.qdrant.client.grpc.JsonWithInt.Value;
+import io.qdrant.client.grpc.Points.NamedVectors;
+import io.qdrant.client.grpc.Points.PointStruct;
+import io.qdrant.client.grpc.Points.Vectors;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class QdrantSink extends StreamPipesDataSink {
+ public static final String QDRANT_HOST_KEY = "qdrant_host";
+ public static final String QDRANT_PORT_KEY = "qdrant_port";
+ public static final String QDRANT_API_KEY_KEY = "qdrant_api_key";
+ public static final String COLLECTION_NAME_KEY = "qdrant_collection_name";
+ public static final String VECTOR_NAME_KEY = "qdrant_vector_field";
+ public static final String VECTOR_DIMENSION_KEY = "qdrant_vector_dimension";
+ public static final String VECTOR_DISTANCE_KEY = "qdrant_distance_metric";
+ public static final String ID_KEY = "qdrant_id";
+
+ private QdrantClient client;
+ private String vector;
+ private String id;
+ private String collectionName;
+ private Integer dimension;
+ private Distance distanceType;
+
+ public static final String BYTE = XSD.BYTE.toString();
+ public static final String SHORT = XSD.SHORT.toString();
+ public static final String LONG = XSD.LONG.toString();
+ public static final String INT = XSD.INT.toString();
+ public static final String FLOAT = XSD.FLOAT.toString();
+ public static final String DOUBLE = XSD.DOUBLE.toString();
+ public static final String BOOLEAN = XSD.BOOLEAN.toString();
+ public static final String STRING = XSD.STRING.toString();
+
+ private static final Map<String, Distance> DISTANCE_TYPE_MAP =
+ new HashMap<>() {
+ {
+ put("Cosine", Distance.Cosine);
+ put("Euclid", Distance.Euclid);
+ put("Dot", Distance.Dot);
+ put("Manhattan", Distance.Manhattan);
+ }
+ };
+
+ @Override
+ public DataSinkDescription declareModel() {
+ return
DataSinkBuilder.create("org.apache.streampipes.sinks.databases.jvm.qdrant", 0)
+ .withLocales(Locales.EN)
+ .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+ .category(DataSinkType.DATABASE)
+ .requiredTextParameter(Labels.withId(QDRANT_HOST_KEY), "localhost")
+ .requiredIntegerParameter(Labels.withId(QDRANT_PORT_KEY), 6334)
+ .requiredTextParameter(Labels.withId(QDRANT_API_KEY_KEY),
"<optional-api-key>")
+ .requiredTextParameter(Labels.withId(COLLECTION_NAME_KEY))
+ .requiredTextParameter(Labels.withId(ID_KEY))
+ .requiredIntegerParameter(Labels.withId(VECTOR_DIMENSION_KEY))
+ .requiredStream(
+ StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.anyProperty(),
+ Labels.withId(VECTOR_NAME_KEY),
+ PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(
+ Labels.withId(VECTOR_DISTANCE_KEY),
+ Options.from(DISTANCE_TYPE_MAP.keySet().toArray(new String[0])))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(SinkParams parameters, EventSinkRuntimeContext
runtimeContext)
+ throws SpRuntimeException {
+ var extractor = parameters.extractor();
+
+ final String host = validateAndExtractHost(extractor);
+ final Integer port = validateAndExtractPort(extractor);
+ final String apiKey = validateAndExtractApiKey(extractor);
+ this.collectionName = validateAndExtractCollectionName(extractor);
+ this.id = validateAndExtractId(extractor);
+ this.vector = validateAndExtractVectorField(extractor);
+ this.dimension = validateAndExtractDimension(extractor);
+ this.distanceType = validateAndExtractDistanceType(extractor);
+
+ try {
+ client = new QdrantClient(QdrantGrpcClient.newBuilder(host,
port).withApiKey(apiKey).build());
+
+ createOrValidateCollection();
+
+ } catch (Exception e) {
+ if (client != null) {
+ client.close();
+ }
+ throw new SpRuntimeException("Failed to initialize Qdrant connection: "
+ e.getMessage());
+ }
+ }
+
+ private String validateAndExtractHost(IDataSinkParameterExtractor extractor)
+ throws SpRuntimeException {
+ String host = extractor.singleValueParameter(QDRANT_HOST_KEY,
String.class);
+ if (host == null || host.trim().isEmpty()) {
+ throw new SpRuntimeException("Host cannot be empty");
+ }
+ return host;
+ }
+
+ private Integer validateAndExtractPort(IDataSinkParameterExtractor extractor)
+ throws SpRuntimeException {
+ Integer port = extractor.singleValueParameter(QDRANT_PORT_KEY,
Integer.class);
+ if (port == null || port < 1 || port > 65535) {
+ throw new SpRuntimeException("Port must be between 1 and 65535");
+ }
+ return port;
+ }
+
+ private String validateAndExtractApiKey(IDataSinkParameterExtractor
extractor)
+ throws SpRuntimeException {
+ String apiKey = extractor.singleValueParameter(QDRANT_API_KEY_KEY,
String.class);
+ if (apiKey == null || apiKey.trim().isEmpty()) {
+ throw new SpRuntimeException("API key cannot be empty");
+ }
+ return apiKey;
+ }
+
+ private String validateAndExtractCollectionName(IDataSinkParameterExtractor
extractor)
+ throws SpRuntimeException {
+ String collectionName =
extractor.singleValueParameter(COLLECTION_NAME_KEY, String.class);
+ if (collectionName == null || collectionName.trim().isEmpty()) {
+ throw new SpRuntimeException("Collection name cannot be empty");
+ }
+ return collectionName;
+ }
+
+ private String validateAndExtractId(IDataSinkParameterExtractor extractor)
+ throws SpRuntimeException {
+ String id = extractor.singleValueParameter(ID_KEY, String.class);
+ if (id == null || id.trim().isEmpty()) {
+ throw new SpRuntimeException("ID field cannot be empty");
+ }
+ try {
+ UUID.fromString(id);
+ } catch (IllegalArgumentException e) {
+ throw new SpRuntimeException("Invalid ID format. The ID must be a valid
UUID string.");
+ }
+ return id;
+ }
+
+ private String validateAndExtractVectorField(IDataSinkParameterExtractor
extractor)
+ throws SpRuntimeException {
+ String vectorField = extractor.mappingPropertyValue(VECTOR_NAME_KEY);
+ if (vectorField == null || vectorField.trim().isEmpty()) {
+ throw new SpRuntimeException("Vector field cannot be empty");
+ }
+ return vectorField.substring(4);
+ }
+
+ private Integer validateAndExtractDimension(IDataSinkParameterExtractor
extractor)
+ throws SpRuntimeException {
+ Integer dimension =
+ Integer.valueOf(extractor.singleValueParameter(VECTOR_DIMENSION_KEY,
String.class));
+ if (dimension == null || dimension <= 0) {
+ throw new SpRuntimeException("Vector dimension must be a positive
number");
+ }
+ return dimension;
+ }
+
+ private Distance validateAndExtractDistanceType(IDataSinkParameterExtractor
extractor)
+ throws SpRuntimeException {
+ String distanceTypeStr =
extractor.selectedSingleValue(VECTOR_DISTANCE_KEY, String.class);
+ Distance distanceType = DISTANCE_TYPE_MAP.get(distanceTypeStr);
+ if (distanceType == null) {
+ throw new SpRuntimeException("Invalid distance type: " +
distanceTypeStr);
+ }
+ return distanceType;
+ }
+
+ private void createOrValidateCollection() throws SpRuntimeException {
+ try {
+ var collectionExists =
client.collectionExistsAsync(collectionName).get();
+ if (!collectionExists) {
+
+ client
+ .createCollectionAsync(
+ collectionName,
+ Map.of(
+ vector,
+
VectorParams.newBuilder().setSize(dimension).setDistance(distanceType).build()))
+ .get();
+ }
+ } catch (Exception e) {
+ throw new SpRuntimeException("Failed to create or validate collection: "
+ e.getMessage());
+ }
+ }
+
+ @Override
+ public void onDetach() {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (event == null) {
+ return;
+ }
+
+ final Map<String, Object> measurementValuePairs = event.getRaw();
+ if (measurementValuePairs.size() <= 1) {
+ return;
+ }
+
+ try {
+ Map<String, Value> payload = new HashMap<>();
+ List<Float> vectorValues = null;
+
+ for (Map.Entry<String, Object> entry : measurementValuePairs.entrySet())
{
+ final String name = entry.getKey();
+ final Object value = entry.getValue();
+
+ if (name.equals(vector)) {
+ vectorValues = validateAndExtractVectorValues(value);
+ } else if (value != null) {
+ payload.put(name, QdrantValueFactory.value(value));
+ }
+ }
+
+ if (vectorValues != null) {
+ if (vectorValues.size() != dimension) {
+ throw new SpRuntimeException(
+ String.format(
+ "Vector dimension mismatch. Expected %d but got %d",
+ dimension, vectorValues.size()));
+ }
+
+ PointStruct point =
+ PointStruct.newBuilder()
+ .setId(PointIdFactory.id(UUID.fromString(id)))
+ .setVectors(
+ Vectors.newBuilder()
+ .setVectors(
+ NamedVectors.newBuilder()
+ .putAllVectors(Map.of(vector,
VectorFactory.vector(vectorValues)))
+ .build())
+ .build())
+ .putAllPayload(payload)
+ .build();
+
+ client.upsertAsync(collectionName,
Collections.singletonList(point)).get();
+ } else {
+ throw new SpRuntimeException("No vector values found in the event");
+ }
+ } catch (Exception e) {
+ throw new SpRuntimeException("Error processing event: " +
e.getMessage());
+ }
+ }
+
+ private List<Float> validateAndExtractVectorValues(Object value) throws
SpRuntimeException {
+ if (!(value instanceof List)) {
+ throw new SpRuntimeException("Vector field must be a list of numbers");
+ }
+
+ List<?> list = (List<?>) value;
+ try {
+ return list.stream()
+ .map(
+ item -> {
+ if (item instanceof Number) {
+ return ((Number) item).floatValue();
+ }
+ throw new IllegalArgumentException("Vector must contain only
numbers");
+ })
+ .collect(Collectors.toList());
+ } catch (IllegalArgumentException e) {
+ throw new SpRuntimeException("Invalid vector values: " + e.getMessage());
+ }
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantValueFactory.java
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantValueFactory.java
new file mode 100644
index 0000000000..5557e1e1c3
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/qdrant/QdrantValueFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.databases.jvm.qdrant;
+
+import io.qdrant.client.ValueFactory;
+import io.qdrant.client.grpc.JsonWithInt.Struct;
+import io.qdrant.client.grpc.JsonWithInt.Value;
+import org.springframework.util.Assert;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Utility methods for building io.qdrant.client.grpc.JsonWithInt.Value from
Java objects. */
+final class QdrantValueFactory {
+
+ private QdrantValueFactory() {}
+
+ @SuppressWarnings("unchecked")
+ public static Value value(Object value) {
+
+ if (value == null) {
+ return ValueFactory.nullValue();
+ }
+
+ if (value.getClass().isArray()) {
+ int length = Array.getLength(value);
+ Object[] objectArray = new Object[length];
+ for (int i = 0; i < length; i++) {
+ objectArray[i] = Array.get(value, i);
+ }
+ return value(objectArray);
+ }
+
+ if (value instanceof Map) {
+ return value((Map<String, Object>) value);
+ }
+
+ if (value instanceof List) {
+ return value((List<Object>) value);
+ }
+
+ switch (value.getClass().getSimpleName()) {
+ case "String":
+ return ValueFactory.value((String) value);
+ case "Integer":
+ return ValueFactory.value((Integer) value);
+ case "Long":
+ return ValueFactory.value(String.valueOf(value));
+ case "Double":
+ return ValueFactory.value((Double) value);
+ case "Float":
+ return ValueFactory.value((Float) value);
+ case "Boolean":
+ return ValueFactory.value((Boolean) value);
+ default:
+ throw new IllegalArgumentException("Unsupported Qdrant value type: " +
value.getClass());
+ }
+ }
+
+ private static Value value(List<Object> elements) {
+ List<Value> values = new ArrayList<Value>(elements.size());
+
+ for (Object element : elements) {
+ values.add(value(element));
+ }
+
+ return ValueFactory.list(values);
+ }
+
+ private static Value value(Object[] elements) {
+ List<Value> values = new ArrayList<Value>(elements.length);
+
+ for (Object element : elements) {
+ values.add(value(element));
+ }
+
+ return ValueFactory.list(values);
+ }
+
+ private static Value value(Map<String, Object> inputMap) {
+ Struct.Builder structBuilder = Struct.newBuilder();
+ Map<String, Value> map = toValueMap(inputMap);
+ structBuilder.putAllFields(map);
+ return Value.newBuilder().setStructValue(structBuilder).build();
+ }
+
+ private static Map<String, Value> toValueMap(Map<String, Object> inputMap) {
+ Assert.notNull(inputMap, "Input map must not be null");
+
+ return inputMap.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey(), e -> value(e.getValue())));
+ }
+}
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/documentation.md
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/documentation.md
new file mode 100644
index 0000000000..974d959ce2
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/documentation.md
@@ -0,0 +1,88 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Qdrant
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+Stores events in a Qdrant vector database. This sink connector allows you to
store vector data along with associated metadata in a Qdrant collection.
+
+***
+
+## Required input
+
+This sink requires an input stream that contains a vector field. The vector
field should be a list of float values.
+
+***
+
+## Configuration
+
+### Host
+
+The host address of the Qdrant instance (e.g., "localhost" or
"xyz-example.cloud-region.cloud-provider.cloud.qdrant.io").
+
+### Port
+
+The port number of the Qdrant instance (default is 6334 for gRPC).
+
+### API Key
+
+The API key for authentication with Qdrant. This is required for secure access
to the Qdrant instance.
+
+### Collection Name
+
+The name of the collection where the data will be stored. If the collection
doesn't exist, it will be created automatically.
+
+### ID Field
+
+The field name that will be used as the unique identifier for each point in
the collection. This should be a UUID string.
+
+### Vector Field
+
+The name of the field containing the vector data. This field should contain a
list of float values.
+
+### Vector Dimension
+
+The dimension of the vectors to be stored (default is 384). This must match
the dimension of your input vectors.
+
+### Distance Metric
+
+The distance metric to use for vector similarity search. Available options are:
+
+- Cosine
+- Euclid
+- Dot
+- Manhattan
+
+## Output
+
+(not applicable for data sinks)
+
+## Notes
+
+- The sink automatically creates the collection if it doesn't exist
+- All non-vector fields from the input event are stored as payload
+- The sink uses [gRPC for
communication](https://qdrant.tech/documentation/interfaces/#grpc-interface)
with Qdrant
+- Vector data must be provided as a list of float values
+- The ID fmust be a valid UUID string
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/icon.png
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/icon.png
new file mode 100644
index 0000000000..a66b10cba2
Binary files /dev/null and
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/icon.png
differ
diff --git
a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/strings.en
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/strings.en
new file mode 100644
index 0000000000..e63878964d
--- /dev/null
+++
b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/resources/org.apache.streampipes.sinks.databases.jvm.qdrant/strings.en
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampipes.sinks.databases.jvm.qdrant.title=Qdrant
+org.apache.streampipes.sinks.databases.jvm.qdrant.description=Stores events in
a Qdrant vector database. This sink connector allows you to store vector data
along with associated metadata in a Qdrant collection.
+
+qdrant_host.title=Host
+qdrant_host.description=The host address of the Qdrant instance
+
+qdrant_port.title=Port
+qdrant_port.description=The port number of the Qdrant instance (default is
6334 for gRPC)
+
+qdrant_api_key.title=API Key
+qdrant_api_key.description=The API key for authentication with Qdrant. This is
required for secure access to the Qdrant instance.
+
+qdrant_collection_name.title=Collection Name
+qdrant_collection_name.description=The name of the collection where the data
will be stored. If the collection doesn't exist, it will be created
automatically.
+
+qdrant_id.title=ID Field
+qdrant_id.description=The field name that will be used as the unique
identifier for each point in the collection. This should be a UUID string.
+
+qdrant_vector_field.title=Vector Field
+qdrant_vector_field.description=The name of the field containing the vector
data. This field should contain a list of float values.
+
+qdrant_vector_dimension.title=Vector Dimension
+qdrant_vector_dimension.description=The dimension of the vectors to be stored.
This must match the dimension of your input vectors.
+
+qdrant_distance_metric.title=Distance Metric
+qdrant_distance_metric.description=The distance metric to use for vector
similarity search. Available options are: Cosine, Euclid, Dot, and Manhattan.