claudevdm commented on code in PR #33413:
URL: https://github.com/apache/beam/pull/33413#discussion_r1896095351


##########
sdks/python/apache_beam/ml/rag/enrichment/bigquery_vector_search.py:
##########
@@ -0,0 +1,304 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from dataclasses import dataclass
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+from google.cloud import bigquery
+
+from apache_beam.ml.rag.types import Chunk
+from apache_beam.ml.rag.types import Embedding
+from apache_beam.transforms.enrichment import EnrichmentSourceHandler
+
+
+@dataclass
+class BigQueryVectorSearchParameters:
+  """Parameters for configuring BigQuery vector similarity search.
+
+    This class encapsulates the configuration needed to perform vector
+    similarity search using BigQuery's VECTOR_SEARCH function. It handles
+    formatting the query with proper embedding vectors and metadata
+    restrictions.
+
+    Example with flattened metadata column:
+    
+    Table schema::
+
+        embedding: ARRAY<FLOAT64>  # Vector embedding
+        content: STRING           # Document content
+        language: STRING          # Direct metadata column
+
+    Code::
+
+        >>> params = BigQueryVectorSearchParameters(
+        ...     table_name='project.dataset.embeddings',
+        ...     embedding_column='embedding',
+        ...     columns=['content', 'language'],
+        ...     neighbor_count=5,
+        ...     # For column 'language', value comes from 
+        ...     # chunk.metadata['language']
+        ...     metadata_restriction_template="language = '{language}'"
+        ... )
+        >>> # When processing a chunk with metadata={'language': 'en'},
+        >>> # generates: WHERE language = 'en'
+
+    Example with nested repeated metadata:
+    
+    Table schema::
+
+        embedding: ARRAY<FLOAT64>  # Vector embedding
+        content: STRING           # Document content
+        metadata: ARRAY<STRUCT<   # Nested repeated metadata
+          key: STRING,
+          value: STRING
+        >>
+
+    Code::
+
+        >>> params = BigQueryVectorSearchParameters(
+        ...     table_name='project.dataset.embeddings',
+        ...     embedding_column='embedding',
+        ...     columns=['content', 'metadata'],
+        ...     neighbor_count=5,
+        ...     # check_metadata(field_name, key_to_search, value_from_chunk)
+        ...     metadata_restriction_template=(
+        ...         "check_metadata(metadata, 'language', '{language}')"
+        ...     )
+        ... )
+        >>> # When processing a chunk with metadata={'language': 'en'},
+        >>> # generates: WHERE check_metadata(metadata, 'language', 'en')
+        >>> # Searches for {key: 'language', value: 'en'} in metadata array
+
+    Args:
+        table_name: Fully qualified BigQuery table name containing vectors.
+        embedding_column: Column name containing the embedding vectors.
+        columns: List of columns to retrieve from matched vectors.
+        neighbor_count: Number of similar vectors to return (top-k).
+        metadata_restriction_template: Template string for filtering vectors. 

Review Comment:
   Thanks for flagging this, it made me realize the batching logic is actually 
flawed since VECTOR_SEARCH is more of a join type of operation.
   
   I ended up rewriting the logic so we batch together elements with the exact 
same restrictions and then UNION the results from the min-batches together like
   
   ```
   SELECT 
         query.id,
         ARRAY_AGG(
             STRUCT(base.content, base.metadata, base.domain)
         ) as chunks
     FROM VECTOR_SEARCH(
         (SELECT content, metadata, domain, embedding 
          FROM `table`
          WHERE domain = 'medical' AND check_metadata(metadata, 'language', 
'en')),
         'embedding',
         (SELECT * FROM (SELECT 'query1' as id, [0.1, 0.2, 0.3] as embedding)),
         top_k => 1
         
         
     )
     GROUP BY query.id
   UNION ALL 
     SELECT 
         query.id,
         ARRAY_AGG(
             STRUCT(base.content, base.metadata, base.domain)
         ) as chunks
     FROM VECTOR_SEARCH(
         (SELECT content, metadata, domain, embedding 
          FROM `table`
          WHERE domain = 'doesntexist' AND check_metadata(metadata, 'language', 
'en')),
         'embedding',
         (SELECT * FROM (SELECT 'query2' as id, [0.1, 0.2, 0.3] as embedding)),
         top_k => 1
         
         
     )
     GROUP BY query.id
   ```
   
   Also added some tests to verify the correct behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to