gemini-code-assist[bot] commented on code in PR #36791:
URL: https://github.com/apache/beam/pull/36791#discussion_r2520012612
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -157,10 +162,13 @@ def __enter__(self):
def _execute_query(self, query: str):
try:
results = self.client.query(query=query).result()
+ row_list = [dict(row.items()) for row in results]
+ if not row_list:
+ return None
if self._batching_kwargs:
- return [dict(row.items()) for row in results]
+ return row_list
else:
- return [dict(row.items()) for row in results][0]
+ return row_list[0]
Review Comment:

This implementation materializes all query results into `row_list` even for
non-batched requests where only the first row is needed. This can be
inefficient for large result sets.
You can optimize this by handling the batched and non-batched cases
separately to avoid unnecessary memory usage in the non-batched case.
```suggestion
if self._batching_kwargs:
row_list = [dict(row.items()) for row in results]
return row_list if row_list else None
else:
try:
return dict(next(results).items())
except StopIteration:
return None
```
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -223,6 +241,12 @@ def __call__(self, request: Union[beam.Row,
list[beam.Row]], *args, **kwargs):
# construct the query.
query = self.query_template.format(*values)
response_dict = self._execute_query(query)
+ if response_dict is None:
+ if self.throw_exception_on_empty_results:
+ raise ValueError(f"no matching row found for query: {query}")
+ else:
+ _LOGGER.warning('no matching row found for query: %s', query)
Review Comment:

Logging the entire query string can be a security risk if it contains
sensitive data. Consider logging the key of the request instead, which is more
concise and likely safer for logging.
```suggestion
_LOGGER.warning('No matching row found in table %s for key: %s',
self.table_name, self.create_row_key(request))
```
##########
sdks/python/apache_beam/transforms/enrichment_handlers/bigquery.py:
##########
@@ -204,11 +212,21 @@ def __call__(self, request: Union[beam.Row,
list[beam.Row]], *args, **kwargs):
query = raw_query.format(*values)
responses_dict = self._execute_query(query)
- for response in responses_dict:
- response_row = beam.Row(**response)
- response_key = self.create_row_key(response_row)
- if response_key in requests_map:
- responses.append((requests_map[response_key], response_row))
+ unmatched_requests = requests_map.copy()
+ if responses_dict:
+ for response in responses_dict:
+ response_row = beam.Row(**response)
+ response_key = self.create_row_key(response_row)
+ if response_key in unmatched_requests:
+ req = unmatched_requests.pop(response_key)
+ responses.append((req, response_row))
+ if unmatched_requests:
+ if self.throw_exception_on_empty_results:
+ raise ValueError(f"no matching row found for query: {query}")
+ else:
+ _LOGGER.warning('no matching row found for query: %s', query)
Review Comment:

Logging the entire query string can be a security risk if it contains
sensitive data. It can also be very long for batched requests, making logs hard
to read. Consider logging more concise and safer information, like the number
of unmatched requests and the table name.
```suggestion
_LOGGER.warning('%d requests in batch found no matching rows in
table %s.', len(unmatched_requests), self.table_name)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]