This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a56572d3b2 NIFI-12983 Qdrant vector store support
a56572d3b2 is described below
commit a56572d3b20222ebae632d35fd132686d536a3d8
Author: Anush008 <[email protected]>
AuthorDate: Mon Apr 1 11:25:13 2024 +0530
NIFI-12983 Qdrant vector store support
Co-authored-by: Pierre Villard <[email protected]>
Signed-off-by: Pierre Villard <[email protected]>
This closes #8590.
---
.../src/main/python/vectorstores/PutQdrant.py | 174 +++++++++++++++++++
.../src/main/python/vectorstores/QdrantUtils.py | 125 ++++++++++++++
.../src/main/python/vectorstores/QueryQdrant.py | 192 +++++++++++++++++++++
.../src/main/python/vectorstores/requirements.txt | 5 +
4 files changed, 496 insertions(+)
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py
new file mode 100644
index 0000000000..f7e2de1ea5
--- /dev/null
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutQdrant.py
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores.qdrant import Qdrant
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+from nifiapi.properties import (
+ PropertyDescriptor,
+ StandardValidators,
+ ExpressionLanguageScope,
+)
+import json
+from EmbeddingUtils import (
+ create_embedding_service,
+)
+from nifiapi.documentation import use_case
+
+from qdrant_client.models import Distance
+
+import QdrantUtils
+
+
+@use_case(
+ description="Create embeddings that semantically represent text content
and upload to Qdrant - https://qdrant.tech/",
+ notes="This processor assumes that the data has already been formatted in
JSONL format with the text to store in Qdrant provided in the 'text' field.",
+ keywords=["qdrant", "embedding", "vector", "text", "vectorstore",
"insert"],
+ configuration="""
+ Configure 'Collection Name' to the name of the Qdrant
collection to use.
+ Configure 'Qdrant URL' to the fully qualified URL of the
Qdrant instance.
+ Configure 'Qdrant API Key' to the API Key to use in order to
authenticate with Qdrant.
+ Configure 'Prefer gRPC' to True if you want to use gRPC for
interfacing with Qdrant.
+ Configure 'Use HTTPS' to True if you want to use TLS(HTTPS)
while interfacing with Qdrant.
+ Configure 'Embedding Model' to indicate whether OpenAI
embeddings should be used or a HuggingFace embedding model should be used:
'Hugging Face Model' or 'OpenAI Model'
+ Configure 'HuggingFace API Key' or 'OpenAI API Key', depending
on the chosen Embedding Model.
+ Configure 'HuggingFace Model' or 'OpenAI Model' to the name of
the model to use.
+ Configure 'Force Recreate Collection' to True if you want to
recreate the collection if it already exists.
+ Configure 'Similarity Metric' to the similarity metric to use
when querying Qdrant.
+
+ If the documents to send to Qdrant contain a unique
identifier(UUID), set the 'Document ID Field Name' property to the name of the
field that contains the document ID.
+ This property can be left blank, in which case a UUID will be
generated based on the FlowFile's filename.
+ """,
+)
+class PutQdrant(FlowFileTransform):
+ class Java:
+ implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+ class ProcessorDetails:
+ version = "@project.version@"
+ description = """Publishes JSON data to Qdrant. The Incoming data must
be in single JSON per Line format, each with two keys: 'text' and 'metadata'.
+ The text must be a string, while metadata must be a map
with strings for values. Any additional fields will be ignored."""
+ tags = [
+ "qdrant",
+ "vector",
+ "vectordb",
+ "vectorstore",
+ "embeddings",
+ "ai",
+ "artificial intelligence",
+ "ml",
+ "machine learning",
+ "text",
+ "LLM",
+ ]
+
+ DOC_ID_FIELD_NAME = PropertyDescriptor(
+ name="Document ID Field Name",
+ description="""Specifies the name of the field in the 'metadata'
element of each document where the document's ID can be found.
+ If not specified, a UUID will be generated based on the
FlowFile's filename and an incremental number.""",
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ )
+ FORCE_RECREATE_COLLECTION = PropertyDescriptor(
+ name="Force Recreate Collection",
+ description="Specifies whether to recreate the collection if it
already exists. Essentially clearing the existing data.",
+ required=True,
+ default_value=False,
+ allowable_values=["True", "False"],
+ validators=[StandardValidators.BOOLEAN_VALIDATOR],
+ )
+ SIMILARITY_METRIC = PropertyDescriptor(
+ name="Similarity Metric",
+ description="Specifies the similarity metric when creating the
collection.",
+ required=True,
+ default_value=Distance.COSINE,
+ allowable_values=[
+ Distance.COSINE,
+ Distance.EUCLID,
+ Distance.DOT,
+ Distance.MANHATTAN,
+ ],
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ )
+
+ properties = (
+ QdrantUtils.QDRANT_PROPERTIES
+ + QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+ + [
+ FORCE_RECREATE_COLLECTION,
+ SIMILARITY_METRIC,
+ DOC_ID_FIELD_NAME,
+ ]
+ )
+
+ def __init__(self, **kwargs):
+ pass
+
+ def getPropertyDescriptors(self):
+ return self.properties
+
+ def onScheduled(self, context):
+ # The Qdrant#construct_instance() internally checks if the collection
exists
+ # and creates it if it doesn't with the appropriate dimesions and
configurations.
+ self.vector_store = Qdrant.construct_instance(
+ texts=[
+ "Some text to obtain the embeddings dimension when creating
the collection"
+ ],
+ embedding=create_embedding_service(context),
+
collection_name=context.getProperty(QdrantUtils.COLLECTION_NAME).getValue(),
+ url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
+ api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
+
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
+ https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
+ force_recreate=context.getProperty(
+ self.FORCE_RECREATE_COLLECTION
+ ).asBoolean(),
+
distance_func=context.getProperty(self.SIMILARITY_METRIC).getValue(),
+ )
+
+ def transform(self, context, flowfile):
+ id_field_name = (
+ context.getProperty(self.DOC_ID_FIELD_NAME)
+ .evaluateAttributeExpressions(flowfile)
+ .getValue()
+ )
+
+ # Read the FlowFile content as "json-lines".
+ json_lines = flowfile.getContentsAsBytes().decode()
+ i = 1
+ texts, metadatas, ids = [], [], []
+ for line in json_lines.split("\n"):
+ try:
+ doc = json.loads(line)
+ except Exception as e:
+ raise ValueError(f"Could not parse line {i} as JSON") from e
+
+ metadata = doc.get("metadata")
+ texts.append(doc.get("text"))
+ metadatas.append(metadata)
+
+ doc_id = None
+ if id_field_name is not None:
+ doc_id = metadata.get(id_field_name)
+ if doc_id is None:
+ doc_id = QdrantUtils.convert_id(
+ flowfile.getAttribute("filename") + "-" + str(i)
+ )
+ ids.append(doc_id)
+
+ i += 1
+
+ self.vector_store.add_texts(texts=texts, metadatas=metadatas, ids=ids)
+ return FlowFileTransformResult(relationship="success")
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py
new file mode 100644
index 0000000000..9b4214b32c
--- /dev/null
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QdrantUtils.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.properties import (
+ PropertyDescriptor,
+ StandardValidators,
+ ExpressionLanguageScope,
+ PropertyDependency,
+)
+from EmbeddingUtils import (
+ OPENAI,
+ HUGGING_FACE,
+ EMBEDDING_MODEL,
+)
+
+import uuid
+
+DEFAULT_COLLECTION_NAME = "apache-nifi"
+
+
+COLLECTION_NAME = PropertyDescriptor(
+ name="Collection Name",
+ description="The name of the Qdrant collection to use.",
+ sensitive=False,
+ required=True,
+ default_value=DEFAULT_COLLECTION_NAME,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+)
+QDRANT_URL = PropertyDescriptor(
+ name="Qdrant URL",
+ description="The fully qualified URL to the Qdrant instance.",
+ sensitive=False,
+ required=True,
+ default_value="http://localhost:6333",
+ validators=[StandardValidators.URL_VALIDATOR],
+)
+QDRANT_API_KEY = PropertyDescriptor(
+ name="Qdrant API Key",
+ description="The API Key to use in order to authentication with Qdrant.
Can be empty.",
+ sensitive=True,
+ required=True,
+)
+
+PREFER_GRPC = PropertyDescriptor(
+ name="Prefer gRPC",
+ description="Specifies whether to use gRPC for interfacing with Qdrant.",
+ required=True,
+ default_value=False,
+ allowable_values=["True", "False"],
+ validators=[StandardValidators.BOOLEAN_VALIDATOR],
+)
+HTTPS = PropertyDescriptor(
+ name="Use HTTPS",
+ description="Specifies whether to TLS(HTTPS) while interfacing with
Qdrant.",
+ required=True,
+ default_value=False,
+ allowable_values=["True", "False"],
+ validators=[StandardValidators.BOOLEAN_VALIDATOR],
+)
+
+QDRANT_PROPERTIES = [COLLECTION_NAME, QDRANT_URL, QDRANT_API_KEY, PREFER_GRPC,
HTTPS]
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+ name="HuggingFace API Key",
+ description="The API Key for interacting with HuggingFace",
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ required=True,
+ sensitive=True,
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+ name="HuggingFace Model",
+ description="The name of the HuggingFace model to use.",
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ required=True,
+ default_value="sentence-transformers/all-MiniLM-L6-v2",
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)],
+)
+OPENAI_API_KEY = PropertyDescriptor(
+ name="OpenAI API Key",
+ description="The API Key for OpenAI in order to create embeddings.",
+ sensitive=True,
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+ name="OpenAI Model",
+ description="The name of the OpenAI model to use.",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ default_value="text-embedding-ada-002",
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)],
+)
+
+EMBEDDING_MODEL_PROPERTIES = [
+ EMBEDDING_MODEL,
+ HUGGING_FACE_API_KEY,
+ HUGGING_FACE_MODEL,
+ OPENAI_API_KEY,
+ OPENAI_API_MODEL,
+]
+
+
+def convert_id(_id: str) -> str:
+ """
+ Converts any string into a UUID string deterministically.
+
+ Qdrant accepts UUID strings and unsigned integers as point ID.
+ This allows us to overwrite the same point with the original ID.
+ """
+ return str(uuid.uuid5(uuid.NAMESPACE_DNS, _id))
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py
new file mode 100644
index 0000000000..7e1b0fca49
--- /dev/null
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryQdrant.py
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores.qdrant import Qdrant
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+from nifiapi.properties import (
+ PropertyDescriptor,
+ StandardValidators,
+ ExpressionLanguageScope,
+)
+import QueryUtils
+import json
+from EmbeddingUtils import (
+ create_embedding_service,
+)
+
+from nifiapi.documentation import use_case
+
+from qdrant_client import QdrantClient
+
+import QdrantUtils
+
+
+@use_case(
+ description="Semantically search for documents stored in Qdrant -
https://qdrant.tech/",
+ keywords=["qdrant", "embedding", "vector", "text", "vectorstore",
"search"],
+ configuration="""
+ Configure 'Collection Name' to the name of the Qdrant
collection to use.
+ Configure 'Qdrant URL' to the fully qualified URL of the
Qdrant instance.
+ Configure 'Qdrant API Key' to the API Key to use in order to
authenticate with Qdrant.
+ Configure 'Prefer gRPC' to True if you want to use gRPC for
interfacing with Qdrant.
+ Configure 'Use HTTPS' to True if you want to use TLS(HTTPS)
while interfacing with Qdrant.
+ Configure 'Embedding Model' to indicate whether OpenAI
embeddings should be used or a HuggingFace embedding model should be used:
'Hugging Face Model' or 'OpenAI Model'
+ Configure 'HuggingFace API Key' or 'OpenAI API Key', depending
on the chosen Embedding Model.
+ Configure 'HuggingFace Model' or 'OpenAI Model' to the name of
the model to use.
+ Configure 'Query' to the text of the query to send to Qdrant.
+ Configure 'Number of Results' to the number of results to
return from Qdrant.
+ Configure 'Metadata Filter' to apply an optional metadata
filter with the query. For example: { "author": "john.doe" }
+ Configure 'Output Strategy' to indicate how the output should
be formatted: 'Row-Oriented', 'Text', or 'Column-Oriented'.
+ Configure 'Results Field' to the name of the field to insert
the results, if the input FlowFile is JSON Formatted,.
+ Configure 'Include Metadatas' to True if metadata should be
included in the output.
+ Configure 'Include Distances' to True if distances should be
included in the output.
+ """,
+)
+class QueryQdrant(FlowFileTransform):
+ class Java:
+ implements = ["org.apache.nifi.python.processor.FlowFileTransform"]
+
+ class ProcessorDetails:
+ version = "@project.version@"
+ description = "Queries Qdrant in order to gather a specified number of
documents that are most closely related to the given query."
+ tags = [
+ "qdrant",
+ "vector",
+ "vectordb",
+ "vectorstore",
+ "embeddings",
+ "ai",
+ "artificial intelligence",
+ "ml",
+ "machine learning",
+ "text",
+ "LLM",
+ ]
+
+ QUERY = PropertyDescriptor(
+ name="Query",
+ description="The text of the query to send to Qdrant.",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ )
+ NUMBER_OF_RESULTS = PropertyDescriptor(
+ name="Number of Results",
+ description="The number of results to return from Qdrant.",
+ required=True,
+ validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+ default_value="10",
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ )
+ FILTER = PropertyDescriptor(
+ name="Metadata Filter",
+ description='Optional metadata filter to apply with the query. For
example: { "author": "john.doe" }',
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ )
+
+ properties = (
+ QdrantUtils.QDRANT_PROPERTIES
+ + QdrantUtils.EMBEDDING_MODEL_PROPERTIES
+ + [
+ QUERY,
+ FILTER,
+ NUMBER_OF_RESULTS,
+ QueryUtils.OUTPUT_STRATEGY,
+ QueryUtils.RESULTS_FIELD,
+ QueryUtils.INCLUDE_METADATAS,
+ QueryUtils.INCLUDE_DISTANCES,
+ ]
+ )
+
+ embeddings = None
+ query_utils = None
+ client = None
+
+ def __init__(self, **kwargs):
+ pass
+
+ def getPropertyDescriptors(self):
+ return self.properties
+
+ def onScheduled(self, context):
+ self.client = QdrantClient(
+ url=context.getProperty(QdrantUtils.QDRANT_URL).getValue(),
+ api_key=context.getProperty(QdrantUtils.QDRANT_API_KEY).getValue(),
+
prefer_grpc=context.getProperty(QdrantUtils.PREFER_GRPC).asBoolean(),
+ https=context.getProperty(QdrantUtils.HTTPS).asBoolean(),
+ )
+ self.embeddings = create_embedding_service(context)
+ self.query_utils = QueryUtils.QueryUtils(context)
+
+ def transform(self, context, flowfile):
+ collection_name = (
+ context.getProperty(QdrantUtils.COLLECTION_NAME)
+ .evaluateAttributeExpressions(flowfile)
+ .getValue()
+ )
+ query = (
+ context.getProperty(self.QUERY)
+ .evaluateAttributeExpressions(flowfile)
+ .getValue()
+ )
+ num_results = (
+ context.getProperty(self.NUMBER_OF_RESULTS)
+ .evaluateAttributeExpressions(flowfile)
+ .asInteger()
+ )
+ filter = (
+ context.getProperty(self.FILTER)
+ .evaluateAttributeExpressions(flowfile)
+ .getValue()
+ )
+ vector_store = Qdrant(
+ client=self.client,
+ collection_name=collection_name,
+ embeddings=self.embeddings,
+ )
+ results = vector_store.similarity_search_with_score(
+ query=query,
+ k=num_results,
+ filter=None if filter is None else json.loads(filter),
+ )
+
+ documents = []
+ for result in results:
+ documents.append(result[0].page_content)
+
+ if context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean():
+ metadatas = []
+ for result in results:
+ metadatas.append(result[0].metadata)
+ else:
+ metadatas = None
+
+ if context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean():
+ distances = []
+ for result in results:
+ distances.append(result[1])
+ else:
+ distances = None
+
+ (output_contents, mime_type) = self.query_utils.create_json(
+ flowfile, documents, metadatas, None, distances, None
+ )
+ attributes = {"mime.type": mime_type}
+
+ return FlowFileTransformResult(
+ relationship="success", contents=output_contents,
attributes=attributes
+ )
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
index fbefc24508..fcaf25b839 100644
---
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
@@ -15,6 +15,8 @@
# Shared requirements
openai==1.9.0
+tiktoken
+langchain==0.1.11
# Chroma requirements
chromadb==0.4.22
@@ -30,3 +32,6 @@ langchain==0.1.11
# OpenSearch requirements
opensearch-py==2.5.0
+
+# Qdrant requirements
+qdrant-client==1.9.1