This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 63a8be9f023 Handle query returned with empty rows gracefully on 
bigquery enrichmement (#36791)
63a8be9f023 is described below

commit 63a8be9f023c8a20ee4ad4cff794be85e7c83cd4
Author: RuiLong J. <[email protected]>
AuthorDate: Thu Nov 13 13:52:33 2025 -0800

    Handle query returned with empty rows gracefully on bigquery enrichmement 
(#36791)
    
    * Handle query returned with empty rows gracefully on bigquery enrichment 
handler
    
    * Ran isort linter
    
    * Manually update import order
    
    * Update 
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * Update 
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * Update sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * fix typo
    
    * Updated string formatting and added a test case
    
    * Clean up logic in checking unmatched requests
    
    ---------
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
 .../transforms/enrichment_handlers/bigquery.py     |  38 +++++-
 .../enrichment_handlers/bigquery_it_test.py        | 141 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py 
b/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
index 06b40bf38cc..115c5320767 100644
--- a/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
+++ b/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import logging
 from collections.abc import Callable
 from collections.abc import Mapping
 from typing import Any
@@ -30,6 +31,8 @@ from apache_beam.transforms.enrichment import 
EnrichmentSourceHandler
 QueryFn = Callable[[beam.Row], str]
 ConditionValueFn = Callable[[beam.Row], list[Any]]
 
+_LOGGER = logging.getLogger(__name__)
+
 
 def _validate_bigquery_metadata(
     table_name, row_restriction_template, fields, condition_value_fn, 
query_fn):
@@ -87,6 +90,7 @@ class 
BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, list[Row]],
       query_fn: Optional[QueryFn] = None,
       min_batch_size: int = 1,
       max_batch_size: int = 10000,
+      throw_exception_on_empty_results: bool = True,
       **kwargs,
   ):
     """
@@ -145,6 +149,7 @@ class 
BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, list[Row]],
     self.query_template = (
         "SELECT %s FROM %s WHERE %s" %
         (self.select_fields, self.table_name, self.row_restriction_template))
+    self.throw_exception_on_empty_results = throw_exception_on_empty_results
     self.kwargs = kwargs
     self._batching_kwargs = {}
     if not query_fn:
@@ -157,10 +162,13 @@ class 
BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, list[Row]],
   def _execute_query(self, query: str):
     try:
       results = self.client.query(query=query).result()
+      row_list = [dict(row.items()) for row in results]
+      if not row_list:
+        return None
       if self._batching_kwargs:
-        return [dict(row.items()) for row in results]
+        return row_list
       else:
-        return [dict(row.items()) for row in results][0]
+        return row_list[0]
     except BadRequest as e:
       raise BadRequest(
           f'Could not execute the query: {query}. Please check if '
@@ -204,11 +212,21 @@ class 
BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, list[Row]],
       query = raw_query.format(*values)
 
       responses_dict = self._execute_query(query)
-      for response in responses_dict:
-        response_row = beam.Row(**response)
-        response_key = self.create_row_key(response_row)
-        if response_key in requests_map:
-          responses.append((requests_map[response_key], response_row))
+      unmatched_requests = requests_map.copy()
+      if responses_dict:
+        for response in responses_dict:
+          response_row = beam.Row(**response)
+          response_key = self.create_row_key(response_row)
+          if response_key in unmatched_requests:
+            req = unmatched_requests.pop(response_key)
+            responses.append((req, response_row))
+      if unmatched_requests:
+        if self.throw_exception_on_empty_results:
+          raise ValueError(f"no matching row found for query: {query}")
+        else:
+          _LOGGER.warning('no matching row found for query: %s', query)
+          for req in unmatched_requests.values():
+            responses.append((req, beam.Row()))
       return responses
     else:
       request_dict = request._asdict()
@@ -223,6 +241,12 @@ class 
BigQueryEnrichmentHandler(EnrichmentSourceHandler[Union[Row, list[Row]],
         # construct the query.
         query = self.query_template.format(*values)
       response_dict = self._execute_query(query)
+      if response_dict is None:
+        if self.throw_exception_on_empty_results:
+          raise ValueError(f"no matching row found for query: {query}")
+        else:
+          _LOGGER.warning('no matching row found for query: %s', query)
+        return request, beam.Row()
       return request, beam.Row(**response_dict)
 
   def __exit__(self, exc_type, exc_val, exc_tb):
diff --git 
a/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py 
b/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py
index ab9375a12e7..067c1c2f9b3 100644
--- a/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py
+++ b/sdks/python/apache_beam/transforms/enrichment_handlers/bigquery_it_test.py
@@ -355,6 +355,147 @@ class TestBigQueryEnrichmentIT(BigQueryEnrichmentIT):
       assert_that(pcoll_cached, equal_to(expected_rows))
     BigQueryEnrichmentHandler.__call__ = actual
 
+  def test_bigquery_enrichment_no_results_throws_exception(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+    ]
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        throw_exception_on_empty_results=True,
+    )
+
+    with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+      with TestPipeline(is_integration_test=True) as test_pipeline:
+        _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
+  def test_bigquery_enrichment_no_results_graceful(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+        beam.Row(id=1000, name='Y'),  # This ID does not exist
+    ]
+    # When no results are found and not throwing, Enrichment yields original.
+    expected_rows = requests
+
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        min_batch_size=1,
+        max_batch_size=100,
+        throw_exception_on_empty_results=False,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      assert_that(pcoll, equal_to(expected_rows))
+
+  def test_bigquery_enrichment_no_results_partial_graceful_batched(self):
+    requests = [
+        beam.Row(id=1, name='A'),  # This ID exists
+        beam.Row(id=1000, name='Y'),  # This ID does not exist
+    ]
+    # When no results are found and not throwing, Enrichment yields original.
+    expected_rows = [
+        beam.Row(id=1, name='A', quantity=2, distribution_center_id=3),
+        beam.Row(id=1000,
+                 name='Y'),  # This ID does not exist so remains unchanged
+    ]
+
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        min_batch_size=2,
+        max_batch_size=100,
+        throw_exception_on_empty_results=False,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      assert_that(pcoll, equal_to(expected_rows))
+
+  def test_bigquery_enrichment_no_results_graceful_batched(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+        beam.Row(id=1000, name='Y'),  # This ID does not exist
+    ]
+    # When no results are found and not throwing, Enrichment yields original.
+    expected_rows = requests
+
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        min_batch_size=2,
+        max_batch_size=100,
+        throw_exception_on_empty_results=False,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      assert_that(pcoll, equal_to(expected_rows))
+
+  def test_bigquery_enrichment_no_results_with_query_fn_throws_exception(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+    ]
+    # This query_fn will return no results
+    fn = functools.partial(query_fn, self.table_name)
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        query_fn=fn,
+        throw_exception_on_empty_results=True,
+    )
+
+    with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+      with TestPipeline(is_integration_test=True) as test_pipeline:
+        _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
+  def test_bigquery_enrichment_no_results_with_query_fn_graceful(self):
+    requests = [
+        beam.Row(id=999, name='X'),  # This ID does not exist
+        beam.Row(id=1000, name='Y'),  # This ID does not exist
+    ]
+    # When no results are found and not throwing, Enrichment yields original.
+    expected_rows = requests
+
+    # This query_fn will return no results
+    fn = functools.partial(query_fn, self.table_name)
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        query_fn=fn,
+        throw_exception_on_empty_results=False,
+    )
+
+    with TestPipeline(is_integration_test=True) as test_pipeline:
+      pcoll = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+      assert_that(pcoll, equal_to(expected_rows))
+
+  def test_bigquery_enrichment_partial_results_throws_exception_batched(self):
+    requests = [
+        beam.Row(id=1, name='A'),  # This ID exists
+        beam.Row(id=1000, name='Y'),  # This ID does not exist
+    ]
+    handler = BigQueryEnrichmentHandler(
+        project=self.project,
+        row_restriction_template="id = {}",
+        table_name=self.table_name,
+        fields=['id'],
+        min_batch_size=2,
+        max_batch_size=100,
+        throw_exception_on_empty_results=True,
+    )
+
+    with self.assertRaisesRegex(ValueError, "no matching row found for query"):
+      with TestPipeline(is_integration_test=True) as test_pipeline:
+        _ = (test_pipeline | beam.Create(requests) | Enrichment(handler))
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to