This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b608e5a2f0 NIFI-12831: Add PutOpenSearchVector and
QueryOpenSearchVector processors
b608e5a2f0 is described below
commit b608e5a2f0002a0f9c6d6121815b54f6dae67dc4
Author: Mark Bathori <[email protected]>
AuthorDate: Wed Feb 21 15:13:47 2024 +0100
NIFI-12831: Add PutOpenSearchVector and QueryOpenSearchVector processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #8441.
---
.../python/vectorstores/OpenSearchVectorUtils.py | 142 ++++++++++++
.../python/vectorstores/PutOpenSearchVector.py | 245 +++++++++++++++++++++
.../python/vectorstores/QueryOpenSearchVector.py | 219 ++++++++++++++++++
.../src/main/python/vectorstores/requirements.txt | 3 +
4 files changed, 609 insertions(+)
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py
new file mode 100644
index 0000000000..a10eaba7c9
--- /dev/null
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.properties import PropertyDescriptor, StandardValidators,
ExpressionLanguageScope, PropertyDependency
+from EmbeddingUtils import OPENAI, HUGGING_FACE, EMBEDDING_MODEL
+
+# Space types
+L2 = ("L2 (Euclidean distance)", "l2")
+L1 = ("L1 (Manhattan distance)", "l1")
+LINF = ("L-infinity (chessboard) distance", "linf")
+COSINESIMIL = ("Cosine similarity", "cosinesimil")
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+ name="HuggingFace API Key",
+ description="The API Key for interacting with HuggingFace",
+ required=True,
+ sensitive=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+ name="HuggingFace Model",
+ description="The name of the HuggingFace model to use",
+ default_value="sentence-transformers/all-MiniLM-L6-v2",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
+)
+OPENAI_API_KEY = PropertyDescriptor(
+ name="OpenAI API Key",
+ description="The API Key for OpenAI in order to create embeddings",
+ required=True,
+ sensitive=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+ name="OpenAI Model",
+ description="The API Key for OpenAI in order to create embeddings",
+ default_value="text-embedding-ada-002",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
+)
+HTTP_HOST = PropertyDescriptor(
+ name="HTTP Host",
+ description="URL where OpenSearch is hosted.",
+ default_value="http://localhost:9200",
+ required=True,
+ validators=[StandardValidators.URL_VALIDATOR]
+)
+USERNAME = PropertyDescriptor(
+ name="Username",
+ description="The username to use for authenticating to OpenSearch server",
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+PASSWORD = PropertyDescriptor(
+ name="Password",
+ description="The password to use for authenticating to OpenSearch server",
+ required=False,
+ sensitive=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+INDEX_NAME = PropertyDescriptor(
+ name="Index Name",
+ description="The name of the OpenSearch index.",
+ sensitive=False,
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+VECTOR_FIELD = PropertyDescriptor(
+ name="Vector Field Name",
+ description="The name of field in the document where the embeddings are
stored. This field need to be a 'knn_vector' typed field.",
+ default_value="vector_field",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+TEXT_FIELD = PropertyDescriptor(
+ name="Text Field Name",
+ description="The name of field in the document where the text is stored.",
+ default_value="text",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+
+
+def create_authentication_params(context):
+ username = context.getProperty(USERNAME).getValue()
+ password = context.getProperty(PASSWORD).getValue()
+
+ params = {"verify_certs": "true"}
+
+ if username is not None and password is not None:
+ params["http_auth"] = (username, password)
+
+ return params
+
+
+def parse_documents(json_lines, id_field_name, file_name):
+ import json
+
+ texts = []
+ metadatas = []
+ ids = []
+ for i, line in enumerate(json_lines.split("\n"), start=1):
+ try:
+ doc = json.loads(line)
+ except Exception as e:
+ raise ValueError(f"Could not parse line {i} as JSON") from e
+
+ text = doc.get('text')
+ metadata = doc.get('metadata')
+ texts.append(text)
+
+ # Remove any null values, or it will cause the embedding to fail
+ filtered_metadata = {key: value for key, value in metadata.items() if
value is not None}
+ metadatas.append(filtered_metadata)
+
+ doc_id = None
+ if id_field_name is not None:
+ doc_id = metadata.get(id_field_name)
+ if doc_id is None:
+ doc_id = file_name + "-" + str(i)
+ ids.append(doc_id)
+
+ return {"texts": texts, "metadatas": metadatas, "ids": ids}
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py
new file mode 100644
index 0000000000..c0ff29bdb7
--- /dev/null
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py
@@ -0,0 +1,245 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators,
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY,
OPENAI_API_MODEL, HUGGING_FACE_API_KEY,
+ HUGGING_FACE_MODEL, HTTP_HOST, USERNAME,
PASSWORD, INDEX_NAME, VECTOR_FIELD,
+ TEXT_FIELD, create_authentication_params,
parse_documents)
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from nifiapi.documentation import use_case, ProcessorConfiguration
+
+
+@use_case(description="Create vectors/embeddings that represent text content
and send the vectors to OpenSearch",
+ notes="This use case assumes that the data has already been
formatted in JSONL format with the text to store in OpenSearch provided in the
'text' field.",
+ keywords=["opensearch", "embedding", "vector", "text",
"vectorstore", "insert"],
+ configuration="""
+ Configure the 'HTTP Host' to an appropriate URL where
OpenSearch is accessible.
+ Configure 'Embedding Model' to indicate whether OpenAI
embeddings should be used or a HuggingFace embedding model should be used:
'Hugging Face Model' or 'OpenAI Model'
+ Configure the 'OpenAI API Key' or 'HuggingFace API Key',
depending on the chosen Embedding Model.
+ Set 'Index Name' to the name of your OpenSearch Index.
+ Set 'Vector Field Name' to the name of the field in the
document which will store the vector data.
+ Set 'Text Field Name' to the name of the field in the document
which will store the text data.
+
+ If the documents to send to OpenSearch contain a unique
identifier, set the 'Document ID Field Name' property to the name of the field
that contains the document ID.
+ This property can be left blank, in which case a unique ID
will be generated based on the FlowFile's filename.
+
+ If the provided index does not exists in OpenSearch then the
processor is capable to create it. The 'New Index Strategy' property defines
+ that the index needs to be created from the default template
or it should be configured with custom values.
+ """)
+@use_case(description="Update vectors/embeddings in OpenSearch",
+ notes="This use case assumes that the data has already been
formatted in JSONL format with the text to store in OpenSearch provided in the
'text' field.",
+ keywords=["opensearch", "embedding", "vector", "text",
"vectorstore", "update", "upsert"],
+ configuration="""
+ Configure the 'HTTP Host' to an appropriate URL where
OpenSearch is accessible.
+ Configure 'Embedding Model' to indicate whether OpenAI
embeddings should be used or a HuggingFace embedding model should be used:
'Hugging Face Model' or 'OpenAI Model'
+ Configure the 'OpenAI API Key' or 'HuggingFace API Key',
depending on the chosen Embedding Model.
+ Set 'Index Name' to the name of your OpenSearch Index.
+ Set 'Vector Field Name' to the name of the field in the
document which will store the vector data.
+ Set 'Text Field Name' to the name of the field in the document
which will store the text data.
+ Set the 'Document ID Field Name' property to the name of the
field that contains the identifier of the document in OpenSearch to update.
+ """)
+class PutOpenSearchVector(FlowFileTransform):
+ class Java:
+ implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+ class ProcessorDetails:
+ version = '@project.version@'
+ description = """Publishes JSON data to OpenSearch. The Incoming data
must be in single JSON per Line format, each with two keys: 'text' and
'metadata'.
+ The text must be a string, while metadata must be a map
with strings for values. Any additional fields will be ignored."""
+ tags = ["opensearch", "vector", "vectordb", "vectorstore",
"embeddings", "ai", "artificial intelligence", "ml",
+ "machine learning", "text", "LLM"]
+
+ # Engine types
+ NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
+ FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
+ LUCENE = ("lucene", "lucene")
+
+ ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
+
+ # Space types
+ INNERPRODUCT = ("Inner product", "innerproduct")
+
+ NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
+ FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
+ LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
+
+ # New Index Mapping Strategy
+ DEFAULT_INDEX_MAPPING = "Default index mapping"
+ CUSTOM_INDEX_MAPPING = "Custom index mapping"
+
+ DOC_ID_FIELD_NAME = PropertyDescriptor(
+ name="Document ID Field Name",
+ description="""Specifies the name of the field in the 'metadata'
element of each document where the document's ID can be found.
+ If not specified, an ID will be generated based on the
FlowFile's filename and a one-up number.""",
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+ )
+ NEW_INDEX_STRATEGY = PropertyDescriptor(
+ name="New Index Strategy",
+ description="Specifies the Mapping strategy to use for new index
creation. The default template values are the following: "
+ "{engine: nmslib, space_type: l2, ef_search: 512,
ef_construction: 512, m: 16}",
+ allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
+ default_value=DEFAULT_INDEX_MAPPING,
+ required=False,
+ )
+ ENGINE = PropertyDescriptor(
+ name="Engine",
+ description="The approximate k-NN library to use for indexing and
search.",
+ allowable_values=ENGINE_VALUES.keys(),
+ default_value=NMSLIB[0],
+ required=False,
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING)]
+ )
+ NMSLIB_SPACE_TYPE = PropertyDescriptor(
+ name="NMSLIB Space Type",
+ description="The vector space used to calculate the distance between
vectors.",
+ allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
+ default_value=L2[0],
+ required=False,
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING),
+ PropertyDependency(ENGINE, NMSLIB[0])]
+ )
+ FAISS_SPACE_TYPE = PropertyDescriptor(
+ name="FAISS Space Type",
+ description="The vector space used to calculate the distance between
vectors.",
+ allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
+ default_value=L2[0],
+ required=False,
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING),
+ PropertyDependency(ENGINE, FAISS[0])]
+ )
+ LUCENE_SPACE_TYPE = PropertyDescriptor(
+ name="Lucene Space Type",
+ description="The vector space used to calculate the distance between
vectors.",
+ allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
+ default_value=L2[0],
+ required=False,
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING),
+ PropertyDependency(ENGINE, LUCENE[0])]
+ )
+ EF_SEARCH = PropertyDescriptor(
+ name="EF Search",
+ description="The size of the dynamic list used during k-NN searches.
Higher values lead to more accurate but slower searches.",
+ default_value="512",
+ required=False,
+ validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING)]
+ )
+ EF_CONSTRUCTION = PropertyDescriptor(
+ name="EF Construction",
+ description="The size of the dynamic list used during k-NN graph
creation. Higher values lead to a more accurate graph but slower indexing
speed.",
+ default_value="512",
+ required=False,
+ validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING)]
+ )
+ M = PropertyDescriptor(
+ name="M",
+ description="The number of bidirectional links that the plugin creates
for each new element. Increasing and "
+ "decreasing this value can have a large impact on memory
consumption. Keep this value between 2 and 100.",
+ default_value="16",
+ required=False,
+
validators=[StandardValidators._standard_validators.createLongValidator(2, 100,
True)],
+ dependencies=[PropertyDependency(NEW_INDEX_STRATEGY,
CUSTOM_INDEX_MAPPING)]
+ )
+
+ properties = [EMBEDDING_MODEL,
+ OPENAI_API_KEY,
+ OPENAI_API_MODEL,
+ HUGGING_FACE_API_KEY,
+ HUGGING_FACE_MODEL,
+ HTTP_HOST,
+ USERNAME,
+ PASSWORD,
+ INDEX_NAME,
+ DOC_ID_FIELD_NAME,
+ VECTOR_FIELD,
+ TEXT_FIELD,
+ NEW_INDEX_STRATEGY,
+ ENGINE,
+ NMSLIB_SPACE_TYPE,
+ FAISS_SPACE_TYPE,
+ LUCENE_SPACE_TYPE,
+ EF_SEARCH,
+ EF_CONSTRUCTION,
+ M]
+
+ embeddings = None
+
+ def __init__(self, **kwargs):
+ pass
+
+ def getPropertyDescriptors(self):
+ return self.properties
+
+ def onScheduled(self, context):
+ self.embeddings = create_embedding_service(context)
+
+ def transform(self, context, flowfile):
+ file_name = flowfile.getAttribute("filename")
+ http_host =
context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
+ index_name =
context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+ id_field_name =
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+ vector_field =
context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+ text_field =
context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+ new_index_strategy =
context.getProperty(self.NEW_INDEX_STRATEGY).evaluateAttributeExpressions().getValue()
+
+ params = {"vector_field": vector_field, "text_field": text_field}
+ params.update(create_authentication_params(context))
+
+ if new_index_strategy == self.CUSTOM_INDEX_MAPPING:
+ engine =
context.getProperty(self.ENGINE).evaluateAttributeExpressions().getValue()
+ params["engine"] = self.ENGINE_VALUES.get(engine)
+
+ if engine == self.NMSLIB[0]:
+ space_type =
context.getProperty(self.NMSLIB_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+ params["space_type"] =
self.NMSLIB_SPACE_TYPE_VALUES.get(space_type)
+ if engine == self.FAISS[0]:
+ space_type =
context.getProperty(self.FAISS_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+ params["space_type"] =
self.FAISS_SPACE_TYPE_VALUES.get(space_type)
+ if engine == self.LUCENE[0]:
+ space_type =
context.getProperty(self.LUCENE_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+ params["space_type"] =
self.LUCENE_SPACE_TYPE_VALUES.get(space_type)
+
+ ef_search =
context.getProperty(self.EF_SEARCH).evaluateAttributeExpressions().asInteger()
+ params["ef_search"] = ef_search
+
+ ef_construction =
context.getProperty(self.EF_CONSTRUCTION).evaluateAttributeExpressions().asInteger()
+ params["ef_construction"] = ef_construction
+
+ m =
context.getProperty(self.M).evaluateAttributeExpressions().asInteger()
+ params["m"] = m
+
+ # Read the FlowFile content as "json-lines".
+ json_lines = flowfile.getContentsAsBytes().decode()
+ parsed_documents = parse_documents(json_lines, id_field_name,
file_name)
+
+ vectorstore = OpenSearchVectorSearch(
+ opensearch_url=http_host,
+ index_name=index_name,
+ embedding_function=self.embeddings,
+ **params
+ )
+ vectorstore.add_texts(texts=parsed_documents["texts"],
+ metadatas=parsed_documents["metadatas"],
+ ids=parsed_documents["ids"],
+ **params
+ )
+
+ return FlowFileTransformResult(relationship="success")
+
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py
new file mode 100644
index 0000000000..488c01d197
--- /dev/null
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py
@@ -0,0 +1,219 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform,
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators,
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY,
OPENAI_API_MODEL, HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL, HTTP_HOST,
+ USERNAME, PASSWORD, INDEX_NAME,
VECTOR_FIELD, TEXT_FIELD, create_authentication_params)
+from QueryUtils import OUTPUT_STRATEGY, RESULTS_FIELD, INCLUDE_METADATAS,
INCLUDE_DISTANCES, QueryUtils
+import json
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+
+class QueryOpenSearchVector(FlowFileTransform):
+ class Java:
+ implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+ class ProcessorDetails:
+ version = '@project.version@'
+ description = "Queries OpenSearch in order to gather a specified
number of documents that are most closely related to the given query."
+ tags = ["opensearch", "vector", "vectordb", "vectorstore",
"embeddings", "ai", "artificial intelligence", "ml",
+ "machine learning", "text", "LLM"]
+
+ # Search types
+ APPROXIMATE_SEARCH = ("Approximate Search", "approximate_search")
+ SCRIPT_SCORING_SEARCH = ("Script Scoring Search", "script_scoring")
+ PAINLESS_SCRIPTING_SEARCH = ("Painless Scripting Search",
"painless_scripting")
+
+ SEARCH_TYPE_VALUES = dict([APPROXIMATE_SEARCH, SCRIPT_SCORING_SEARCH,
PAINLESS_SCRIPTING_SEARCH])
+
+ # Script Scoring Search space types
+ HAMMINGBIT = ("Hamming distance", "hammingbit")
+
+ SCRIPT_SCORING_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL,
HAMMINGBIT])
+
+ # Painless Scripting Search space types
+ L2_SQUARED = ("L2 (Euclidean distance)", "l2Squared")
+ L1_NORM = ("L1 (Manhattan distance)", "l1Norm")
+ COSINE_SIMILARITY = ("Cosine similarity", "cosineSimilarity")
+
+ PAINLESS_SCRIPTING_SPACE_TYPE_VALUES = dict([L2_SQUARED, L1_NORM,
COSINE_SIMILARITY])
+
+ QUERY = PropertyDescriptor(
+ name="Query",
+ description="The text of the query to send to OpenSearch.",
+ required=True,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+ )
+ NUMBER_OF_RESULTS = PropertyDescriptor(
+ name="Number of Results",
+ description="The number of results to return from OpenSearch",
+ default_value="10",
+ required=True,
+ validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+ expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+ )
+ SEARCH_TYPE = PropertyDescriptor(
+ name="Search Type",
+ description="Specifies the type of the search to be performed.",
+ allowable_values=SEARCH_TYPE_VALUES.keys(),
+ default_value=APPROXIMATE_SEARCH[0],
+ required=True
+ )
+ SCRIPT_SCORING_SPACE_TYPE = PropertyDescriptor(
+ name="Script Scoring Space Type",
+ description="Used to measure the distance between two points in order
to determine the k-nearest neighbors.",
+ allowable_values=SCRIPT_SCORING_SPACE_TYPE_VALUES.keys(),
+ default_value=L2[0],
+ required=False,
+ dependencies=[PropertyDependency(SEARCH_TYPE,
SCRIPT_SCORING_SEARCH[0])]
+ )
+ PAINLESS_SCRIPTING_SPACE_TYPE = PropertyDescriptor(
+ name="Painless Scripting Space Type",
+ description="Used to measure the distance between two points in order
to determine the k-nearest neighbors.",
+ allowable_values=PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.keys(),
+ default_value=L2_SQUARED[0],
+ required=False,
+ dependencies=[PropertyDependency(SEARCH_TYPE,
PAINLESS_SCRIPTING_SEARCH[0])]
+ )
+ BOOLEAN_FILTER = PropertyDescriptor(
+ name="Boolean Filter",
+ description="A Boolean filter is a post filter consists of a Boolean
query that contains a k-NN query and a filter. "
+ "The value of the field must be a JSON representation of
the filter.",
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])]
+ )
+ EFFICIENT_FILTER = PropertyDescriptor(
+ name="Efficient Filter",
+ description="The Lucene Engine or Faiss Engine decides whether to
perform an exact k-NN search with "
+ "pre-filtering or an approximate search with modified
post-filtering. The value of the field must "
+ "be a JSON representation of the filter.",
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])]
+ )
+ PRE_FILTER = PropertyDescriptor(
+ name="Pre Filter",
+ description="Script Score query to pre-filter documents before
identifying nearest neighbors. The value of "
+ "the field must be a JSON representation of the filter.",
+ default_value="{\"match_all\": {}}",
+ required=False,
+ validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+ dependencies=[PropertyDependency(SEARCH_TYPE,
SCRIPT_SCORING_SEARCH[0], PAINLESS_SCRIPTING_SEARCH[0])]
+ )
+
+ properties = [EMBEDDING_MODEL,
+ OPENAI_API_KEY,
+ OPENAI_API_MODEL,
+ HUGGING_FACE_API_KEY,
+ HUGGING_FACE_MODEL,
+ HTTP_HOST,
+ USERNAME,
+ PASSWORD,
+ INDEX_NAME,
+ QUERY,
+ VECTOR_FIELD,
+ TEXT_FIELD,
+ NUMBER_OF_RESULTS,
+ SEARCH_TYPE,
+ SCRIPT_SCORING_SPACE_TYPE,
+ PAINLESS_SCRIPTING_SPACE_TYPE,
+ BOOLEAN_FILTER,
+ EFFICIENT_FILTER,
+ PRE_FILTER,
+ OUTPUT_STRATEGY,
+ RESULTS_FIELD,
+ INCLUDE_METADATAS,
+ INCLUDE_DISTANCES]
+
+ embeddings = None
+ query_utils = None
+
+ def __init__(self, **kwargs):
+ pass
+
+ def getPropertyDescriptors(self):
+ return self.properties
+
+ def onScheduled(self, context):
+ # initialize embedding service
+ self.embeddings = create_embedding_service(context)
+ self.query_utils = QueryUtils(context)
+
+ def transform(self, context, flowfile):
+ http_host =
context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue()
+ index_name =
context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue()
+ query =
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+ num_results =
context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger()
+ vector_field =
context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+ text_field =
context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue()
+ search_type =
context.getProperty(self.SEARCH_TYPE).evaluateAttributeExpressions().getValue()
+
+ params = {"vector_field": vector_field,
+ "text_field": text_field,
+ "search_type": self.SEARCH_TYPE_VALUES.get(search_type)}
+ params.update(create_authentication_params(context))
+
+ if search_type == self.APPROXIMATE_SEARCH[0]:
+ boolean_filter =
context.getProperty(self.BOOLEAN_FILTER).evaluateAttributeExpressions().getValue()
+ if boolean_filter is not None:
+ params["boolean_filter"] = json.loads(boolean_filter)
+
+ efficient_filter =
context.getProperty(self.EFFICIENT_FILTER).evaluateAttributeExpressions().getValue()
+ if efficient_filter is not None:
+ params["efficient_filter"] = json.loads(efficient_filter)
+ else:
+ pre_filter =
context.getProperty(self.PRE_FILTER).evaluateAttributeExpressions().getValue()
+ if pre_filter is not None:
+ params["pre_filter"] = json.loads(pre_filter)
+ if search_type == self.SCRIPT_SCORING_SEARCH[0]:
+ space_type =
context.getProperty(self.SCRIPT_SCORING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+ params["space_type"] =
self.SCRIPT_SCORING_SPACE_TYPE_VALUES.get(space_type)
+ elif search_type == self.PAINLESS_SCRIPTING_SEARCH[0]:
+ space_type =
context.getProperty(self.PAINLESS_SCRIPTING_SPACE_TYPE).evaluateAttributeExpressions().getValue()
+ params["space_type"] =
self.PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.get(space_type)
+
+ vectorstore = OpenSearchVectorSearch(index_name=index_name,
+
embedding_function=self.embeddings,
+ opensearch_url=http_host,
+ **params
+ )
+
+ results = vectorstore.similarity_search_with_score(query=query,
k=num_results, **params)
+
+ documents = []
+ for result in results:
+ documents.append(result[0].page_content)
+
+ if context.getProperty(INCLUDE_METADATAS):
+ metadatas = []
+ for result in results:
+ metadatas.append(result[0].metadata)
+ else:
+ metadatas = None
+
+ if context.getProperty(INCLUDE_DISTANCES):
+ distances = []
+ for result in results:
+ distances.append(result[1])
+ else:
+ distances = None
+
+ (output_contents, mime_type) = self.query_utils.create_json(flowfile,
documents, metadatas, None, distances, None)
+ attributes = {"mime.type": mime_type}
+
+ return FlowFileTransformResult(relationship="success",
contents=output_contents, attributes=attributes)
diff --git
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
index ad78d29d03..fbefc24508 100644
---
a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
+++
b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt
@@ -27,3 +27,6 @@ requests
pinecone-client==3.0.1
tiktoken
langchain==0.1.11
+
+# OpenSearch requirements
+opensearch-py==2.5.0