This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a56572d3b2 NIFI-12983 Qdrant vector store support
a56572d3b2 is described below

commit a56572d3b20222ebae632d35fd132686d536a3d8
Author: Anush008 <[email protected]>
AuthorDate: Mon Apr 1 11:25:13 2024 +0530

    NIFI-12983 Qdrant vector store support
    
    Co-authored-by: Pierre Villard <[email protected]>
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #8590.
---
 .../src/main/python/vectorstores/PutQdrant.py      | 174 +++++++++++++++++++
 .../src/main/python/vectorstores/QdrantUtils.py    | 125 ++++++++++++++
 .../src/main/python/vectorstores/QueryQdrant.py    | 192 +++++++++++++++++++++
 .../src/main/python/vectorstores/requirements.txt  |   5 +
 4 files changed, 496 insertions(+)

diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py
new file mode 100644
index 0000000000..f7e2de1ea5
--- /dev/null
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores.qdrant import Qdrant
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import (
+    PropertyDescriptor,
+    StandardValidators,
+    ExpressionLanguageScope,
+)
+import json
+from EmbeddingUtils import (
+    create_embedding_service,
+)
+from nifiapi.documentation import use_case
+
+from qdrant_client.models import Distance
+
+import QdrantUtils
+
+
+@use_case(
+    description="Create embeddings that semantically represent text content 
and upload to Qdrant - https://qdrant.tech/";,
+    notes="This processor assumes that the data has already been formatted in 
JSONL format with the text to store in Qdrant provided in the 'text' field.",
+    keywords=["qdrant", "embedding", "vector", "text", "vectorstore", 
"insert"],
+    configuration="""
+                Configure 'Collection Name' to the name of the Qdrant 
collection to use.
+                Configure 'Qdrant URL' to the fully qualified URL of the 
Qdrant instance.
+                Configure 'Qdrant API Key' to the API Key to use in order to 
authenticate with Qdrant.
+                Configure 'Prefer gRPC' to True if you want to use gRPC for 
interfacing with Qdrant.
+                Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) 
while interfacing with Qdrant.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure 'HuggingFace API Key' or 'OpenAI API Key', depending 
on the chosen Embedding Model.
+                Configure 'HuggingFace Model' or 'OpenAI Model' to the name of 
the model to use.
+                Configure 'Force Recreate Collection' to True if you want to 
recreate the collection if it already exists.
+                Configure 'Similarity Metric' to the similarity metric to use 
when querying Qdrant.
+
+                If the documents to send to Qdrant contain a unique 
identifier(UUID), set the 'Document ID Field Name' property to the name of the 
field that contains the document ID.
+                This property can be left blank, in which case a UUID will be 
generated based on the FlowFile's filename.
+                """,
+)
+class PutQdrant(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "@project.version@"
+        description = """Publishes JSON data to Qdrant. The Incoming data must 
be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = [
+            "qdrant",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.  
+                    If not specified, a UUID will be generated based on the 
FlowFile's filename and an incremental number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    FORCE_RECREATE_COLLECTION = PropertyDescriptor(
+        name="Force Recreate Collection",
+        description="Specifies whether to recreate the collection if it 
already exists. Essentially clearing the existing data.",
+        required=True,
+        default_value=False,
+        allowable_values=["True", "False"],
+        validators=[StandardValidators.BOOLEAN_VALIDATOR],
+    )
+    SIMILARITY_METRIC = PropertyDescriptor(
+        name="Similarity Metric",
+        description="Specifies the similarity metric when creating the 
collection.",
+        required=True,
+        default_value=Distance.COSINE,
+        allowable_values=[
+            Distance.COSINE,
+            Distance.EUCLID,
+            Distance.DOT,
+            Distance.MANHATTAN,
+        ],
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    )
+
+    properties = (
+        QdrantUtils.QDRANT_PROPERTIES
+        + QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+        + [
+            FORCE_RECREATE_COLLECTION,
+            SIMILARITY_METRIC,
+            DOC_ID_FIELD_NAME,
+        ]
+    )
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        # The Qdrant#construct_instance() internally checks if the collection 
exists
+        # and creates it if it doesn't with the appropriate dimesions and 
configurations.
+        self.vector_store = Qdrant.construct_instance(
+            texts=[
+                "Some text to obtain the embeddings dimension when creating 
the collection"
+            ],
+            embedding=create_embedding_service(context),
+            
collection_name=context.getProperty(QdrantUtils.COLLECTION_NAME).getValue(),
+            url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
+            api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
+            
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
+            https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
+            force_recreate=context.getProperty(
+                self.FORCE_RECREATE_COLLECTION
+            ).asBoolean(),
+            
distance_func=context.getProperty(self.SIMILARITY_METRIC).getValue(),
+        )
+
+    def transform(self, context, flowfile):
+        id_field_name = (
+            context.getProperty(self.DOC_ID_FIELD_NAME)
+            .evaluateAttributeExpressions(flowfile)
+            .getValue()
+        )
+
+        # Read the FlowFile content as "json-lines".
+        json_lines = flowfile.getContentsAsBytes().decode()
+        i = 1
+        texts, metadatas, ids = [], [], []
+        for line in json_lines.split("\n"):
+            try:
+                doc = json.loads(line)
+            except Exception as e:
+                raise ValueError(f"Could not parse line {i} as JSON") from e
+
+            metadata = doc.get("metadata")
+            texts.append(doc.get("text"))
+            metadatas.append(metadata)
+
+            doc_id = None
+            if id_field_name is not None:
+                doc_id = metadata.get(id_field_name)
+            if doc_id is None:
+                doc_id = QdrantUtils.convert_id(
+                    flowfile.getAttribute("filename") + "-" + str(i)
+                )
+            ids.append(doc_id)
+
+            i += 1
+
+        self.vector_store.add_texts(texts=texts, metadatas=metadatas, ids=ids)
+        return FlowFileTransformResult(relationship="success")
diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py
new file mode 100644
index 0000000000..9b4214b32c
--- /dev/null
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.properties import (
+    PropertyDescriptor,
+    StandardValidators,
+    ExpressionLanguageScope,
+    PropertyDependency,
+)
+from EmbeddingUtils import (
+    OPENAI,
+    HUGGING_FACE,
+    EMBEDDING_MODEL,
+)
+
+import uuid
+
+DEFAULT_COLLECTION_NAME = "apache-nifi"
+
+
+COLLECTION_NAME = PropertyDescriptor(
+    name="Collection Name",
+    description="The name of the Qdrant collection to use.",
+    sensitive=False,
+    required=True,
+    default_value=DEFAULT_COLLECTION_NAME,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+QDRANT_URL = PropertyDescriptor(
+    name="Qdrant URL",
+    description="The fully qualified URL to the Qdrant instance.",
+    sensitive=False,
+    required=True,
+    default_value="http://localhost:6333";,
+    validators=[StandardValidators.URL_VALIDATOR],
+)
+QDRANT_API_KEY = PropertyDescriptor(
+    name="Qdrant API Key",
+    description="The API Key to use in order to authentication with Qdrant. 
Can be empty.",
+    sensitive=True,
+    required=True,
+)
+
+PREFER_GRPC = PropertyDescriptor(
+    name="Prefer gRPC",
+    description="Specifies whether to use gRPC for interfacing with Qdrant.",
+    required=True,
+    default_value=False,
+    allowable_values=["True", "False"],
+    validators=[StandardValidators.BOOLEAN_VALIDATOR],
+)
+HTTPS = PropertyDescriptor(
+    name="Use HTTPS",
+    description="Specifies whether to TLS(HTTPS) while interfacing with 
Qdrant.",
+    required=True,
+    default_value=False,
+    allowable_values=["True", "False"],
+    validators=[StandardValidators.BOOLEAN_VALIDATOR],
+)
+
+QDRANT_PROPERTIES = [COLLECTION_NAME, QDRANT_URL, QDRANT_API_KEY, PREFER_GRPC, 
HTTPS]
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+    name="HuggingFace API Key",
+    description="The API Key for interacting with HuggingFace",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    sensitive=True,
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+    name="HuggingFace Model",
+    description="The name of the HuggingFace model to use.",
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    required=True,
+    default_value="sentence-transformers/all-MiniLM-L6-v2",
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+OPENAI_API_KEY = PropertyDescriptor(
+    name="OpenAI API Key",
+    description="The API Key for OpenAI in order to create embeddings.",
+    sensitive=True,
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+    name="OpenAI Model",
+    description="The name of the OpenAI model to use.",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    default_value="text-embedding-ada-002",
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+
+EMBEDDING_MODEL_PROPERTIES = [
+    EMBEDDING_MODEL,
+    HUGGING_FACE_API_KEY,
+    HUGGING_FACE_MODEL,
+    OPENAI_API_KEY,
+    OPENAI_API_MODEL,
+]
+
+
+def convert_id(_id: str) -> str:
+    """
+    Converts any string into a UUID string deterministically.
+
+    Qdrant accepts UUID strings and unsigned integers as point ID.
+    This allows us to overwrite the same point with the original ID.
+    """
+    return str(uuid.uuid5(uuid.NAMESPACE_DNS, _id))
diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py
new file mode 100644
index 0000000000..7e1b0fca49
--- /dev/null
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores.qdrant import Qdrant
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import (
+    PropertyDescriptor,
+    StandardValidators,
+    ExpressionLanguageScope,
+)
+import QueryUtils
+import json
+from EmbeddingUtils import (
+    create_embedding_service,
+)
+
+from nifiapi.documentation import use_case
+
+from qdrant_client import QdrantClient
+
+import QdrantUtils
+
+
+@use_case(
+    description="Semantically search for documents stored in Qdrant - 
https://qdrant.tech/";,
+    keywords=["qdrant", "embedding", "vector", "text", "vectorstore", 
"search"],
+    configuration="""
+                Configure 'Collection Name' to the name of the Qdrant 
collection to use.
+                Configure 'Qdrant URL' to the fully qualified URL of the 
Qdrant instance.
+                Configure 'Qdrant API Key' to the API Key to use in order to 
authenticate with Qdrant.
+                Configure 'Prefer gRPC' to True if you want to use gRPC for 
interfacing with Qdrant.
+                Configure 'Use HTTPS' to True if you want to use TLS(HTTPS) 
while interfacing with Qdrant.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure 'HuggingFace API Key' or 'OpenAI API Key', depending 
on the chosen Embedding Model.
+                Configure 'HuggingFace Model' or 'OpenAI Model' to the name of 
the model to use.
+                Configure 'Query' to the text of the query to send to Qdrant.
+                Configure 'Number of Results' to the number of results to 
return from Qdrant.
+                Configure 'Metadata Filter' to apply an optional metadata 
filter with the query. For example: { "author": "john.doe" }
+                Configure 'Output Strategy' to indicate how the output should 
be formatted: 'Row-Oriented', 'Text', or 'Column-Oriented'.
+                Configure 'Results Field' to the name of the field to insert 
the results, if the input FlowFile is JSON Formatted,.
+                Configure 'Include Metadatas' to True if metadata should be 
included in the output.
+                Configure 'Include Distances' to True if distances should be 
included in the output.
+                """,
+)
+class QueryQdrant(FlowFileTransform):
+    class Java:
+        implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+    class ProcessorDetails:
+        version = "@project.version@"
+        description = "Queries Qdrant in order to gather a specified number of 
documents that are most closely related to the given query."
+        tags = [
+            "qdrant",
+            "vector",
+            "vectordb",
+            "vectorstore",
+            "embeddings",
+            "ai",
+            "artificial intelligence",
+            "ml",
+            "machine learning",
+            "text",
+            "LLM",
+        ]
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The text of the query to send to Qdrant.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Qdrant.",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+    FILTER = PropertyDescriptor(
+        name="Metadata Filter",
+        description='Optional metadata filter to apply with the query. For 
example: { "author": "john.doe" }',
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+    )
+
+    properties = (
+        QdrantUtils.QDRANT_PROPERTIES
+        + QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+        + [
+            QUERY,
+            FILTER,
+            NUMBER_OF_RESULTS,
+            QueryUtils.OUTPUT_STRATEGY,
+            QueryUtils.RESULTS_FIELD,
+            QueryUtils.INCLUDE_METADATAS,
+            QueryUtils.INCLUDE_DISTANCES,
+        ]
+    )
+
+    embeddings = None
+    query_utils = None
+    client = None
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.properties
+
+    def onScheduled(self, context):
+        self.client = QdrantClient(
+            url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
+            api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
+            
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
+            https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
+        )
+        self.embeddings = create_embedding_service(context)
+        self.query_utils = QueryUtils.QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        collection_name = (
+            context.getProperty(QdrantUtils.COLLECTION_NAME)
+            .evaluateAttributeExpressions(flowfile)
+            .getValue()
+        )
+        query = (
+            context.getProperty(self.QUERY)
+            .evaluateAttributeExpressions(flowfile)
+            .getValue()
+        )
+        num_results = (
+            context.getProperty(self.NUMBER_OF_RESULTS)
+            .evaluateAttributeExpressions(flowfile)
+            .asInteger()
+        )
+        filter = (
+            context.getProperty(self.FILTER)
+            .evaluateAttributeExpressions(flowfile)
+            .getValue()
+        )
+        vector_store = Qdrant(
+            client=self.client,
+            collection_name=collection_name,
+            embeddings=self.embeddings,
+        )
+        results = vector_store.similarity_search_with_score(
+            query=query,
+            k=num_results,
+            filter=None if filter is None else json.loads(filter),
+        )
+
+        documents = []
+        for result in results:
+            documents.append(result[0].page_content)
+
+        if context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean():
+            metadatas = []
+            for result in results:
+                metadatas.append(result[0].metadata)
+        else:
+            metadatas = None
+
+        if context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean():
+            distances = []
+            for result in results:
+                distances.append(result[1])
+        else:
+            distances = None
+
+        (output_contents, mime_type) = self.query_utils.create_json(
+            flowfile, documents, metadatas, None, distances, None
+        )
+        attributes = {"mime.type": mime_type}
+
+        return FlowFileTransformResult(
+            relationship="success", contents=output_contents, 
attributes=attributes
+        )
diff --git 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
index fbefc24508..fcaf25b839 100644
--- 
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
+++ 
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
@@ -15,6 +15,8 @@
 
 # Shared requirements
 openai==1.9.0
+tiktoken
+langchain==0.1.11
 
 # Chroma requirements
 chromadb==0.4.22
@@ -30,3 +32,6 @@ langchain==0.1.11
 
 # OpenSearch requirements
 opensearch-py==2.5.0
+
+# Qdrant requirements
+qdrant-client==1.9.1

Reply via email to