dan-s1 commented on code in PR #8441:
URL: https://github.com/apache/nifi/pull/8441#discussion_r1499899239


##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py:
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (OPENAI_API_KEY, OPENAI_API_MODEL, 
HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL,
+                                   HTTP_HOST,
+                                   USERNAME, PASSWORD, VERIFY_CERTIFICATES, 
INDEX_NAME, VECTOR_FIELD, TEXT_FIELD,
+                                   create_authentication_params, 
parse_documents)
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from nifiapi.documentation import use_case, multi_processor_use_case, 
ProcessorConfiguration
+
+
+@use_case(description="Create vectors/embeddings that represent text content 
and send the vectors to OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "insert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
Document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the Document 
which will store the text data.
+
+                If the documents to send to OpenSearch contain a unique 
identifier, set the 'Document ID Field Name' property to the name of the field 
that contains the document ID.
+                This property can be left blank, in which case a unique ID 
will be generated based on the FlowFile's filename.
+
+                If the provided index does not exists in OpenSearch then the 
processor is capable to create it. The 'New Index Strategy' property defines 
+                that the index needs to be created from the default template 
or it should be configured with custom values.
+                """)
+@use_case(description="Update vectors/embeddings in OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "update", "upsert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
Document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the Document 
which will store the text data.
+                Set the 'Document ID Field Name' property to the name of the 
field that contains the identifier of the document in OpenSearch to update.
+                """)
+class PutOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = """Publishes JSON data to OpenSearch. The Incoming data 
must be in single JSON per Line format, each with two keys: 'text' and 
'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = ["opensearch", "vector", "vectordb", "vectorstore", 
"embeddings", "ai", "artificial intelligence", "ml",
+                "machine learning", "text", "LLM"]
+
+    # Engine types
+    NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
+    FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
+    LUCENE = ("lucene", "lucene")
+
+    ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
+
+    # Space types
+    L2 = ("L2 (Euclidean distance)", "l2")
+    L1 = ("L1 (Manhattan distance)", "l1")
+    LINF = ("L-infinity (chessboard) distance", "linf")
+    COSINESIMIL = ("Cosine similarity", "cosinesimil")
+    INNERPRODUCT = ("Inner product", "innerproduct")
+
+    NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
+    FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
+    LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
+
+    # New Index Mapping Strategy
+    DEFAULT_INDEX_MAPPING = "Default index mapping"
+    CUSTOM_INDEX_MAPPING = "Custom index mapping"
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.  
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NEW_INDEX_STRATEGY = PropertyDescriptor(
+        name="New Index Strategy",
+        description="Specifies the Mapping strategy to use for new index 
creation. The default template values are the following: "
+                    "{engine: nmslib, space_type: l2, ef_search: 512, 
ef_construction: 512, m: 16}",
+        allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
+        default_value=DEFAULT_INDEX_MAPPING,
+        required=False,
+    )
+    ENGINE = PropertyDescriptor(
+        name="Engine",
+        description="The approximate k-NN library to use for indexing and 
search.",
+        allowable_values=ENGINE_VALUES.keys(),
+        default_value=NMSLIB[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    NMSLIB_SPACE_TYPE = PropertyDescriptor(
+        name="NMSLIB Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, NMSLIB[0])]
+    )
+    FAISS_SPACE_TYPE = PropertyDescriptor(
+        name="FAISS Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, FAISS[0])]
+    )
+    LUCENE_SPACE_TYPE = PropertyDescriptor(
+        name="Lucene Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, LUCENE[0])]
+    )
+    EF_SEARCH = PropertyDescriptor(
+        name="EF Search",
+        description="The size of the dynamic list used during k-NN searches. 
Higher values lead to more accurate but slower searches.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NUMBER_VALIDATOR],

Review Comment:
   ```suggestion
           validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
   ```



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py:
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (OPENAI_API_KEY, OPENAI_API_MODEL, 
HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL,
+                                   HTTP_HOST,
+                                   USERNAME, PASSWORD, VERIFY_CERTIFICATES, 
INDEX_NAME, VECTOR_FIELD, TEXT_FIELD,
+                                   create_authentication_params, 
parse_documents)
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from nifiapi.documentation import use_case, multi_processor_use_case, 
ProcessorConfiguration
+
+
+@use_case(description="Create vectors/embeddings that represent text content 
and send the vectors to OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "insert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
Document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the Document 
which will store the text data.
+
+                If the documents to send to OpenSearch contain a unique 
identifier, set the 'Document ID Field Name' property to the name of the field 
that contains the document ID.
+                This property can be left blank, in which case a unique ID 
will be generated based on the FlowFile's filename.
+
+                If the provided index does not exists in OpenSearch then the 
processor is capable to create it. The 'New Index Strategy' property defines 
+                that the index needs to be created from the default template 
or it should be configured with custom values.
+                """)
+@use_case(description="Update vectors/embeddings in OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "update", "upsert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
Document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the Document 
which will store the text data.
+                Set the 'Document ID Field Name' property to the name of the 
field that contains the identifier of the document in OpenSearch to update.
+                """)
+class PutOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = """Publishes JSON data to OpenSearch. The Incoming data 
must be in single JSON per Line format, each with two keys: 'text' and 
'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = ["opensearch", "vector", "vectordb", "vectorstore", 
"embeddings", "ai", "artificial intelligence", "ml",
+                "machine learning", "text", "LLM"]
+
+    # Engine types
+    NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
+    FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
+    LUCENE = ("lucene", "lucene")
+
+    ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
+
+    # Space types
+    L2 = ("L2 (Euclidean distance)", "l2")
+    L1 = ("L1 (Manhattan distance)", "l1")
+    LINF = ("L-infinity (chessboard) distance", "linf")
+    COSINESIMIL = ("Cosine similarity", "cosinesimil")
+    INNERPRODUCT = ("Inner product", "innerproduct")
+
+    NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
+    FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
+    LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
+
+    # New Index Mapping Strategy
+    DEFAULT_INDEX_MAPPING = "Default index mapping"
+    CUSTOM_INDEX_MAPPING = "Custom index mapping"
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.  
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NEW_INDEX_STRATEGY = PropertyDescriptor(
+        name="New Index Strategy",
+        description="Specifies the Mapping strategy to use for new index 
creation. The default template values are the following: "
+                    "{engine: nmslib, space_type: l2, ef_search: 512, 
ef_construction: 512, m: 16}",
+        allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
+        default_value=DEFAULT_INDEX_MAPPING,
+        required=False,
+    )
+    ENGINE = PropertyDescriptor(
+        name="Engine",
+        description="The approximate k-NN library to use for indexing and 
search.",
+        allowable_values=ENGINE_VALUES.keys(),
+        default_value=NMSLIB[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    NMSLIB_SPACE_TYPE = PropertyDescriptor(
+        name="NMSLIB Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, NMSLIB[0])]
+    )
+    FAISS_SPACE_TYPE = PropertyDescriptor(
+        name="FAISS Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, FAISS[0])]
+    )
+    LUCENE_SPACE_TYPE = PropertyDescriptor(
+        name="Lucene Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, LUCENE[0])]
+    )
+    EF_SEARCH = PropertyDescriptor(
+        name="EF Search",
+        description="The size of the dynamic list used during k-NN searches. 
Higher values lead to more accurate but slower searches.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NUMBER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    EF_CONSTRUCTION = PropertyDescriptor(
+        name="EF Construction",
+        description="The size of the dynamic list used during k-NN graph 
creation. Higher values lead to a more accurate graph but slower indexing 
speed.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NUMBER_VALIDATOR],

Review Comment:
   ```suggestion
           validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR],
   ```



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py:
##########
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from EmbeddingUtils import OPENAI, HUGGING_FACE, EMBEDDING_MODEL
+
+HUGGING_FACE_API_KEY = PropertyDescriptor(
+    name="HuggingFace API Key",
+    description="The API Key for interacting with HuggingFace",
+    required=True,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
+)
+HUGGING_FACE_MODEL = PropertyDescriptor(
+    name="HuggingFace Model",
+    description="The name of the HuggingFace model to use",
+    default_value="sentence-transformers/all-MiniLM-L6-v2",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)]
+)
+OPENAI_API_KEY = PropertyDescriptor(
+    name="OpenAI API Key",
+    description="The API Key for OpenAI in order to create embeddings",
+    required=True,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
+)
+OPENAI_API_MODEL = PropertyDescriptor(
+    name="OpenAI Model",
+    description="The API Key for OpenAI in order to create embeddings",
+    default_value="text-embedding-ada-002",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)]
+)
+HTTP_HOST = PropertyDescriptor(
+    name="HTTP Host",
+    description="URL where OpenSearch is hosted.",
+    default_value="http://localhost:9200";,
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+USERNAME = PropertyDescriptor(
+    name="Username",
+    description="The username to use for authenticating to OpenSearch server",
+    required=False,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+PASSWORD = PropertyDescriptor(
+    name="Password",
+    description="The password to use for authenticating to OpenSearch server",
+    required=False,
+    sensitive=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+VERIFY_CERTIFICATES = PropertyDescriptor(
+    name="Verify Certificates",
+    description="The password to use for authenticating to OpenSearch server",
+    allowable_values=["true", "false"],
+    default_value="false",
+    required=False,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR]
+)
+INDEX_NAME = PropertyDescriptor(
+    name="Index Name",
+    description="The name of the OpenSearch index.",
+    sensitive=False,
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+VECTOR_FIELD = PropertyDescriptor(
+    name="Vector Field Name",
+    description="The name of Document field where the embeddings are stored. 
This field need to be a 'knn_vector' typed field.",
+    default_value="vector_field",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+TEXT_FIELD = PropertyDescriptor(
+    name="Text Field Name",
+    description="The name of Document field where the text of the document is 
stored.",
+    default_value="text",
+    required=True,
+    validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+    expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+)
+
+
+def create_authentication_params(context):
+    username = context.getProperty(USERNAME).getValue()
+    password = context.getProperty(PASSWORD).getValue()
+    verify_certificates = context.getProperty(VERIFY_CERTIFICATES).getValue()
+
+    params = {"verify_certs": verify_certificates}
+
+    if username is not None and password is not None:
+        params["http_auth"] = (username, password)
+
+    return params
+
+
+def parse_documents(json_lines, id_field_name, file_name):
+    import json
+
+    texts = []
+    metadatas = []
+    ids = []
+    for i, line in enumerate(json_lines.split("\n"), start=1):
+        try:
+            doc = json.loads(line)
+        except Exception as e:
+            raise ValueError(f"Could not parse line {i} as JSON") from e
+
+        text = doc.get('text')
+        metadata = doc.get('metadata')
+        texts.append(text)
+
+        # Remove any null values, or it will cause the embedding to fail
+        filtered_metadata = {}
+        for key, value in metadata.items():
+            if value is not None:
+                filtered_metadata[key] = value

Review Comment:
   This technically can done in one line using dictionary comprehension
    ```suggestion
           filtered_metadata = {key:value for key, value in metadata.items() if 
value is not None}       
   ```



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py:
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from langchain.vectorstores import OpenSearchVectorSearch
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+from OpenSearchVectorUtils import (OPENAI_API_KEY, OPENAI_API_MODEL, 
HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL,
+                                   HTTP_HOST,
+                                   USERNAME, PASSWORD, VERIFY_CERTIFICATES, 
INDEX_NAME, VECTOR_FIELD, TEXT_FIELD,
+                                   create_authentication_params, 
parse_documents)
+from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service
+from nifiapi.documentation import use_case, multi_processor_use_case, 
ProcessorConfiguration
+
+
+@use_case(description="Create vectors/embeddings that represent text content 
and send the vectors to OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "insert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
Document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the Document 
which will store the text data.
+
+                If the documents to send to OpenSearch contain a unique 
identifier, set the 'Document ID Field Name' property to the name of the field 
that contains the document ID.
+                This property can be left blank, in which case a unique ID 
will be generated based on the FlowFile's filename.
+
+                If the provided index does not exists in OpenSearch then the 
processor is capable to create it. The 'New Index Strategy' property defines 
+                that the index needs to be created from the default template 
or it should be configured with custom values.
+                """)
+@use_case(description="Update vectors/embeddings in OpenSearch",
+          notes="This use case assumes that the data has already been 
formatted in JSONL format with the text to store in OpenSearch provided in the 
'text' field.",
+          keywords=["opensearch", "embedding", "vector", "text", 
"vectorstore", "update", "upsert"],
+          configuration="""
+                Configure the 'HTTP Host' to an appropriate URL where 
OpenSearch is accessible.
+                Configure 'Embedding Model' to indicate whether OpenAI 
embeddings should be used or a HuggingFace embedding model should be used: 
'Hugging Face Model' or 'OpenAI Model'
+                Configure the 'OpenAI API Key' or 'HuggingFace API Key', 
depending on the chosen Embedding Model.
+                Set 'Index Name' to the name of your OpenSearch Index.
+                Set 'Vector Field Name' to the name of the field in the 
Document which will store the vector data.
+                Set 'Text Field Name' to the name of the field in the Document 
which will store the text data.
+                Set the 'Document ID Field Name' property to the name of the 
field that contains the identifier of the document in OpenSearch to update.
+                """)
+class PutOpenSearchVector(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = """Publishes JSON data to OpenSearch. The Incoming data 
must be in single JSON per Line format, each with two keys: 'text' and 
'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored."""
+        tags = ["opensearch", "vector", "vectordb", "vectorstore", 
"embeddings", "ai", "artificial intelligence", "ml",
+                "machine learning", "text", "LLM"]
+
+    # Engine types
+    NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib")
+    FAISS = ("faiss (Facebook AI Similarity Search)", "faiss")
+    LUCENE = ("lucene", "lucene")
+
+    ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE])
+
+    # Space types
+    L2 = ("L2 (Euclidean distance)", "l2")
+    L1 = ("L1 (Manhattan distance)", "l1")
+    LINF = ("L-infinity (chessboard) distance", "linf")
+    COSINESIMIL = ("Cosine similarity", "cosinesimil")
+    INNERPRODUCT = ("Inner product", "innerproduct")
+
+    NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT])
+    FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT])
+    LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL])
+
+    # New Index Mapping Strategy
+    DEFAULT_INDEX_MAPPING = "Default index mapping"
+    CUSTOM_INDEX_MAPPING = "Custom index mapping"
+
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="""Specifies the name of the field in the 'metadata' 
element of each document where the document's ID can be found.  
+                    If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.""",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NEW_INDEX_STRATEGY = PropertyDescriptor(
+        name="New Index Strategy",
+        description="Specifies the Mapping strategy to use for new index 
creation. The default template values are the following: "
+                    "{engine: nmslib, space_type: l2, ef_search: 512, 
ef_construction: 512, m: 16}",
+        allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING],
+        default_value=DEFAULT_INDEX_MAPPING,
+        required=False,
+    )
+    ENGINE = PropertyDescriptor(
+        name="Engine",
+        description="The approximate k-NN library to use for indexing and 
search.",
+        allowable_values=ENGINE_VALUES.keys(),
+        default_value=NMSLIB[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    NMSLIB_SPACE_TYPE = PropertyDescriptor(
+        name="NMSLIB Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, NMSLIB[0])]
+    )
+    FAISS_SPACE_TYPE = PropertyDescriptor(
+        name="FAISS Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=FAISS_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, FAISS[0])]
+    )
+    LUCENE_SPACE_TYPE = PropertyDescriptor(
+        name="Lucene Space Type",
+        description="The vector space used to calculate the distance between 
vectors.",
+        allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(),
+        default_value=L2[0],
+        required=False,
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING),
+                      PropertyDependency(ENGINE, LUCENE[0])]
+    )
+    EF_SEARCH = PropertyDescriptor(
+        name="EF Search",
+        description="The size of the dynamic list used during k-NN searches. 
Higher values lead to more accurate but slower searches.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NUMBER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    EF_CONSTRUCTION = PropertyDescriptor(
+        name="EF Construction",
+        description="The size of the dynamic list used during k-NN graph 
creation. Higher values lead to a more accurate graph but slower indexing 
speed.",
+        default_value="512",
+        required=False,
+        validators=[StandardValidators.NUMBER_VALIDATOR],
+        dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, 
CUSTOM_INDEX_MAPPING)]
+    )
+    M = PropertyDescriptor(
+        name="M",
+        description="The number of bidirectional links that the plugin creates 
for each new element. Increasing and "
+                    "decreasing this value can have a large impact on memory 
consumption. Keep this value between 2 and 100.",
+        default_value="16",
+        required=False,
+        validators=[StandardValidators.NUMBER_VALIDATOR],

Review Comment:
   I am not sure this is possible but on the Java side there is a method in 
`StandardValidators`
   `public static Validator createLongValidator(final long minimum, final long 
maximum, final boolean inclusive)`
   
   which would allow for ensuring the user only enters a number between 2 and 
100.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to