gemini-code-assist[bot] commented on code in PR #39106:
URL: https://github.com/apache/beam/pull/39106#discussion_r3476642410
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -740,7 +744,7 @@ def process(
else:
try:
schema = bigquery_tools.table_schema_to_dict(
- self.bq_wrapper.get_table(
+ bigquery_tools.BigQueryWrapper().get_table(
Review Comment:

Instantiating `BigQueryWrapper` inline on every element/iteration inside
`process` is inefficient because it initializes HTTP clients and credentials
repeatedly. It is better to reuse the existing `self.bq_wrapper` instance.
```suggestion
self.bq_wrapper.get_table(
```
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -1178,13 +1181,12 @@ def _load_data(
# the truncation happens only once. See
# https://github.com/apache/beam/issues/24535.
finished_temp_tables_load_job_ids_list_pc = (
- finished_temp_tables_load_job_ids_pc
- | beam.MapTuple(
+ finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination, job_reference: (
- bigquery_tools.get_hashable_destination(destination),
+ bigquery_tools.parse_table_reference(destination).tableId,
(destination, job_reference)))
| beam.GroupByKey()
- | beam.MapTuple(lambda dest, batch: list(batch)))
+ | beam.MapTuple(lambda tableId, batch: list(batch)))
Review Comment:

Using only `tableId` as the grouping key can cause collisions if the
pipeline writes to multiple datasets or projects that contain tables with the
same name (e.g., `dataset_A.my_table` and `dataset_B.my_table`). Using
`get_hashable_destination` ensures that the full destination (project, dataset,
and table) is used as the key, preventing incorrect grouping and potential data
corruption.
```suggestion
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination, job_reference: (
bigquery_tools.get_hashable_destination(destination),
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda dest, batch: list(batch)))
```
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -851,8 +855,7 @@ def process(self, element):
if latest_partition.can_accept(file_size):
latest_partition.add(file_path, file_size)
else:
- if latest_partition.files:
- partitions.append(latest_partition.files)
+ partitions.append(latest_partition.files)
Review Comment:

Without checking if `latest_partition.files` is non-empty, an empty list
could be appended to `partitions` if the first file cannot be accepted. Adding
a guard prevents empty partitions from being created.
```python
if latest_partition.files:
partitions.append(latest_partition.files)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]