gemini-code-assist[bot] commented on code in PR #38833:
URL: https://github.com/apache/beam/pull/38833#discussion_r3369864980
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -735,19 +762,37 @@ def process(
create_disposition = self.create_disposition
if self.temporary_tables:
+ destination_table = None
+ hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
+ should_lookup_destination_table = (
+ schema is None or
+ not _has_partitioning_load_parameters(additional_parameters))
+ if should_lookup_destination_table:
+ try:
+ destination_table = self.bq_wrapper.get_table(
+ project_id=table_reference.projectId,
+ dataset_id=table_reference.datasetId,
+ table_id=table_reference.tableId)
+ except Exception as e:
+ if schema is None:
+ _LOGGER.warning(
+ "Input schema is absent and could not fetch the final "
+ "destination table's schema [%s]. Creating temp table [%s] "
+ "will likely fail: %s",
+ hashed_dest,
+ job_name,
+ e)
+ destination_table = None
Review Comment:

This implementation introduces a significant performance regression and
redundant BigQuery API calls:
1. **Bypassing the Schema Cache**: Since `should_lookup_destination_table`
is evaluated before checking `self.schema_cache`, if `schema` is `None`
initially, `should_lookup_destination_table` will always be `True`. This forces
a synchronous `get_table` API call on every single bundle/partition even if the
schema is already cached in `self.schema_cache`.
2. **Redundant Calls per Bundle**: If `schema` is provided but partitioning
parameters are not (the default case), `get_table` is called on every single
bundle/partition without any caching.
We can resolve both issues by caching the fetched `destination_table` in a
local cache (e.g., `self._destination_table_cache`) and only calling
`get_table` if we actually need the schema (and it's not in
`self.schema_cache`) or if we need the partitioning parameters.
```python
hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
if not hasattr(self, '_destination_table_cache'):
self._destination_table_cache = {}
destination_table = self._destination_table_cache.get(hashed_dest)
if destination_table is None:
need_schema = schema is None and hashed_dest not in self.schema_cache
need_partitioning = not
_has_partitioning_load_parameters(additional_parameters)
if need_schema or need_partitioning:
try:
destination_table = self.bq_wrapper.get_table(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId)
self._destination_table_cache[hashed_dest] = destination_table
except Exception as e:
if schema is None and hashed_dest not in self.schema_cache:
_LOGGER.warning(
"Input schema is absent and could not fetch the final "
"destination table's schema [%s]. Creating temp table [%s]
"
"will likely fail: %s",
hashed_dest,
job_name,
e)
destination_table = None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]