exceptionfactory commented on code in PR #7894:
URL: https://github.com/apache/nifi/pull/7894#discussion_r1377875690


##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutChroma.py:
##########
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope
+import ChromaUtils
+import EmbeddingUtils
+
+
+class PutChroma(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = """Publishes JSON data to a Chroma VectorDB. The 
Incoming data must be in single JSON per Line format, each with two keys: 
'text' and 'metadata'.
+                       The text must be a string, while metadata must be a map 
with strings for values. Any additional fields will be ignored. If the 
collection name specified
+                       does not exist, the Processor will automatically create 
the collection."""
+        tags = ["chroma", "vector", "vectordb", "embeddings", "ai", 
"artificial intelligence", "ml", "machine learning", "text", "LLM"]
+
+
+    STORE_TEXT = PropertyDescriptor(
+        name="Store Document Text",
+        description="Specifies whether or not the text of the document should 
be stored in Chroma. If so, both the document's text and its embedding will be 
stored. If not, " +
+                    "only the vector/embedding will be stored.",
+        allowable_values=["true", "false"],
+        required=True,
+        default_value="true"
+    )
+    DISTANCE_METHOD = PropertyDescriptor(
+        name="Distance Method",
+        description="If the specified collection does not exist, it will be 
created using this Distance Method. If the collection exists, this property 
will be ignored.",
+        allowable_values=["cosine", "l2", "ip"],
+        default_value="cosine",
+        required=True
+    )
+    DOC_ID_FIELD_NAME = PropertyDescriptor(
+        name="Document ID Field Name",
+        description="Specifies the name of the field in the 'metadata' element 
of each document where the document's ID can be found. " +
+                    "If not specified, an ID will be generated based on the 
FlowFile's filename and a one-up number.",
+        required=False,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+
+
+    client = None
+    embedding_function = None
+
+    def __init__(self, **kwargs):
+        self.property_descriptors = [prop for prop in ChromaUtils.PROPERTIES] 
+ [prop for prop in EmbeddingUtils.PROPERTIES]
+        self.property_descriptors.append(self.STORE_TEXT)
+        self.property_descriptors.append(self.DISTANCE_METHOD)
+        self.property_descriptors.append(self.DOC_ID_FIELD_NAME)
+
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+    def onScheduled(self, context):
+        self.client = ChromaUtils.create_client(context)
+        self.embedding_function = 
EmbeddingUtils.create_embedding_function(context)
+
+
+    def transform(self, context, flowfile):
+        client = self.client
+        embedding_function = self.embedding_function
+        collection_name = 
context.getProperty(ChromaUtils.COLLECTION_NAME).evaluateAttributeExpressions(flowfile).getValue()
+        distance_method = context.getProperty(self.DISTANCE_METHOD).getValue()
+        id_field_name = 
context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue()
+
+        collection = client.get_or_create_collection(
+            name=collection_name,
+            embedding_function=embedding_function,
+            metadata={"hnsw:space": distance_method})
+
+        json_lines = flowfile.getContentsAsBytes().decode()

Review Comment:
   Are there any concerns here related to memory consumption with large files? 
It seems like this should be streamed.



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutChroma.py:
##########
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope
+import ChromaUtils
+import EmbeddingUtils
+
+
+class PutChroma(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = """Publishes JSON data to a Chroma VectorDB. The 
Incoming data must be in single JSON per Line format, each with two keys: 
'text' and 'metadata'.

Review Comment:
   The requirement for this specific structure with ` text` and `metadata`, 
with a single JSON object per line seems limiting. What do you think about 
introducing a Record Reader? That would provide at least one layer of 
flexibility. If those are the only object keys required, perhaps they don't 
need to be configurable, but that could be another option to consider.



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryChroma.py:
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+import ChromaUtils
+import EmbeddingUtils
+import QueryUtils
+
+
+class QueryChroma(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = "Queries a Chroma Vector Database in order to gather a 
specified number of documents that are most closely related to the given query."
+        tags = ["chroma", "vector", "vectordb", "embeddings", "enrich", 
"enrichment", "ai", "artificial intelligence", "ml", "machine learning", 
"text", "LLM"]
+
+
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The query to issue to the Chroma VectorDB. The query is 
always converted into embeddings using the configured embedding function, and 
the embedding is " +
+                    "then sent to Chroma. The text itself is not sent to 
Chroma.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Chroma",

Review Comment:
   Is this a maximum or limit? That would be worth clarifying in the 
description.



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryChroma.py:
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+import ChromaUtils
+import EmbeddingUtils
+import QueryUtils
+
+
+class QueryChroma(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = "Queries a Chroma Vector Database in order to gather a 
specified number of documents that are most closely related to the given query."
+        tags = ["chroma", "vector", "vectordb", "embeddings", "enrich", 
"enrichment", "ai", "artificial intelligence", "ml", "machine learning", 
"text", "LLM"]
+
+
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The query to issue to the Chroma VectorDB. The query is 
always converted into embeddings using the configured embedding function, and 
the embedding is " +
+                    "then sent to Chroma. The text itself is not sent to 
Chroma.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Chroma",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    WHERE_CLAUSE = PropertyDescriptor(
+        name="Where Clause (Metadata Filter)",
+        description="A JSON representation of a Metadata Filter that can be 
applied against the Chroma documents in order to narrow down the documents that 
can be returned. " +
+                    "For example: { \"metadata_field\": \"some_value\" }",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=False
+    )
+    WHERE_DOCUMENT_CLAUSE = PropertyDescriptor(
+        name="Where Document (Document Filter)",
+        description="A JSON representation of a Document Filter that can be 
applied against the Chroma documents' text in order to narrow down the 
documents that can be returned. " +
+                    "For example: { \"$contains\": \"search_string\" }",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=False
+    )
+
+    client = None
+    embedding_function = None
+    include_ids = None
+    include_metadatas = None
+    include_documents = None
+    include_distances = None
+    include_embeddings = None
+    results_field = None
+
+    property_descriptors = [prop for prop in ChromaUtils.PROPERTIES] + [prop 
for prop in EmbeddingUtils.PROPERTIES] + [
+        QUERY,
+        NUMBER_OF_RESULTS,
+        QueryUtils.OUTPUT_STRATEGY,
+        QueryUtils.RESULTS_FIELD,
+        WHERE_CLAUSE,
+        WHERE_DOCUMENT_CLAUSE,
+        QueryUtils.INCLUDE_IDS,
+        QueryUtils.INCLUDE_METADATAS,
+        QueryUtils.INCLUDE_DOCUMENTS,
+        QueryUtils.INCLUDE_DISTANCES,
+        QueryUtils.INCLUDE_EMBEDDINGS]
+
+    def __init__(self, **kwargs):
+        pass
+
+    def getPropertyDescriptors(self):
+        return self.property_descriptors
+
+
+    def onScheduled(self, context):
+        self.client = ChromaUtils.create_client(context)
+        self.embedding_function = 
EmbeddingUtils.create_embedding_function(context)
+        self.include_ids = 
context.getProperty(QueryUtils.INCLUDE_IDS).asBoolean()
+        self.include_metadatas = 
context.getProperty(QueryUtils.INCLUDE_METADATAS).asBoolean()
+        self.include_documents = 
context.getProperty(QueryUtils.INCLUDE_DOCUMENTS).asBoolean()
+        self.include_distances = 
context.getProperty(QueryUtils.INCLUDE_DISTANCES).asBoolean()
+        self.include_embeddings = 
context.getProperty(QueryUtils.INCLUDE_EMBEDDINGS).asBoolean()
+        self.results_field = 
context.getProperty(QueryUtils.RESULTS_FIELD).getValue()
+        self.query_utils = QueryUtils.QueryUtils(context)
+
+    def transform(self, context, flowfile):
+        client = self.client
+        embedding_function = self.embedding_function
+        collection_name = 
context.getProperty(ChromaUtils.COLLECTION_NAME).evaluateAttributeExpressions(flowfile).getValue()
+
+        collection = client.get_collection(
+            name=collection_name,
+            embedding_function=embedding_function)
+
+        query_text = 
context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue()
+        embeddings = embedding_function([query_text])
+
+        included_fields = []
+        if self.include_distances:
+            included_fields.append('distances')
+        if self.include_documents:
+            included_fields.append('documents')
+        if self.include_embeddings:
+            included_fields.append('embeddings')
+        if self.include_metadatas:
+            included_fields.append('metadatas')
+
+        where = None
+        where_clause = 
context.getProperty(self.WHERE_CLAUSE).evaluateAttributeExpressions(flowfile).getValue()
+        if where_clause is not None:
+            where = json.loads(where_clause)
+
+        where_document = None
+        where_document_clause = 
context.getProperty(self.WHERE_DOCUMENT_CLAUSE).evaluateAttributeExpressions(flowfile).getValue()
+        if where_document_clause is not None:
+            where_document = json.loads(where_document_clause)
+
+        query_results = collection.query(
+            query_embeddings=embeddings,
+            
n_results=context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger(),
+            include=included_fields,
+            where_document=where_document,
+            where=where
+        )
+
+        ids = query_results['ids'][0]
+        distances = None if (not self.include_distances or 
query_results['distances'] is None) else query_results['distances'][0]
+        metadatas = None if (not self.include_metadatas or 
query_results['metadatas'] is None) else query_results['metadatas'][0]
+        documents = None if (not self.include_documents or 
query_results['documents'] is None) else query_results['documents'][0]
+        embeddings = None if (not self.include_embeddings or 
query_results['embeddings'] is None) else query_results['embeddings'][0]
+
+        (output_contents, mime_type) = self.query_utils.create_json(flowfile, 
documents, metadatas, embeddings, distances, ids)
+
+        # Return the results
+        attributes = {"mime.type": mime_type}

Review Comment:
   Will the response `mime.type` always be `application/json`? It would be 
helpful to indicate the WritesAttribute status for documentation purposes.



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryChroma.py:
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+import ChromaUtils
+import EmbeddingUtils
+import QueryUtils
+
+
+class QueryChroma(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = "Queries a Chroma Vector Database in order to gather a 
specified number of documents that are most closely related to the given query."
+        tags = ["chroma", "vector", "vectordb", "embeddings", "enrich", 
"enrichment", "ai", "artificial intelligence", "ml", "machine learning", 
"text", "LLM"]
+
+
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The query to issue to the Chroma VectorDB. The query is 
always converted into embeddings using the configured embedding function, and 
the embedding is " +
+                    "then sent to Chroma. The text itself is not sent to 
Chroma.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Chroma",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    WHERE_CLAUSE = PropertyDescriptor(
+        name="Where Clause (Metadata Filter)",

Review Comment:
   Recommend avoiding the use of parentheses in the property name. What do you 
think about naming this simply `Metadata Filter Clause` or `Metadata Filter`? 
With the value needing to be JSON, `Metadata Filter` seems straightforward.



##########
nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryChroma.py:
##########
@@ -0,0 +1,159 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from nifiapi.flowfiletransform import FlowFileTransform, 
FlowFileTransformResult
+from nifiapi.properties import PropertyDescriptor, StandardValidators, 
ExpressionLanguageScope, PropertyDependency
+import ChromaUtils
+import EmbeddingUtils
+import QueryUtils
+
+
+class QueryChroma(FlowFileTransform):
+    class Java:
+        implements = ['org.apache.nifi.python.processor.FlowFileTransform']
+
+    class ProcessorDetails:
+        version = '2.0.0-SNAPSHOT'
+        description = "Queries a Chroma Vector Database in order to gather a 
specified number of documents that are most closely related to the given query."
+        tags = ["chroma", "vector", "vectordb", "embeddings", "enrich", 
"enrichment", "ai", "artificial intelligence", "ml", "machine learning", 
"text", "LLM"]
+
+
+
+    QUERY = PropertyDescriptor(
+        name="Query",
+        description="The query to issue to the Chroma VectorDB. The query is 
always converted into embeddings using the configured embedding function, and 
the embedding is " +
+                    "then sent to Chroma. The text itself is not sent to 
Chroma.",
+        required=True,
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    NUMBER_OF_RESULTS = PropertyDescriptor(
+        name="Number of Results",
+        description="The number of results to return from Chroma",
+        required=True,
+        validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR],
+        default_value="10",
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES
+    )
+    WHERE_CLAUSE = PropertyDescriptor(
+        name="Where Clause (Metadata Filter)",
+        description="A JSON representation of a Metadata Filter that can be 
applied against the Chroma documents in order to narrow down the documents that 
can be returned. " +
+                    "For example: { \"metadata_field\": \"some_value\" }",
+        validators=[StandardValidators.NON_EMPTY_VALIDATOR],
+        expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        required=False
+    )
+    WHERE_DOCUMENT_CLAUSE = PropertyDescriptor(
+        name="Where Document (Document Filter)",

Review Comment:
   As above, what about `Document Filter` for this property name?



##########
nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf:
##########
@@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
 
 java.arg.14=-Djava.awt.headless=true
 
-#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
+java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002

Review Comment:
   It looks like the comment needs to be restored.
   ```suggestion
   
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to