Copilot commented on code in PR #38833:
URL: https://github.com/apache/beam/pull/38833#discussion_r3369844097
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -735,19 +762,37 @@ def process(
create_disposition = self.create_disposition
if self.temporary_tables:
+ destination_table = None
+ hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
+ should_lookup_destination_table = (
+ schema is None or
+ not _has_partitioning_load_parameters(additional_parameters))
+ if should_lookup_destination_table:
+ try:
+ destination_table = self.bq_wrapper.get_table(
+ project_id=table_reference.projectId,
+ dataset_id=table_reference.datasetId,
+ table_id=table_reference.tableId)
+ except Exception as e:
Review Comment:
When `temporary_tables=True` and a schema is already provided, this will
still call `get_table()` for every element unless partitioning parameters are
explicitly set. For pipelines producing many load jobs per destination, this
adds repeated BigQuery API calls and can become a bottleneck. Consider caching
destination metadata (e.g., a `destination_table_cache` keyed by `hashed_dest`
in `start_bundle`, or storing partitioning info alongside `schema_cache`) so
each destination is looked up at most once per bundle.
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py:
##########
@@ -703,6 +703,73 @@ def test_one_load_job_failed_after_waiting(self,
sleep_mock):
sleep_mock.assert_called_once()
+ def test_temporary_table_load_inherits_destination_time_partitioning(self):
+ destination = 'project1:dataset1.table1'
+ partition = (destination, (0, ['gs://bucket/file1']))
+ job_reference = bigquery_api.JobReference(
+ projectId='project1', jobId='job_name1')
+ destination_table = bigquery_api.Table(
+ timePartitioning=bigquery_api.TimePartitioning(type='DAY'))
+
+ dofn = bqfl.TriggerLoadJobs(
+ schema=_ELEMENTS_SCHEMA, test_client=mock.Mock(),
temporary_tables=True)
+ dofn.start_bundle()
+ dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table)
+ dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference)
+
+ list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0)))
+
+ load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs
+ self.assertEqual(
+ load_call['additional_load_parameters']['timePartitioning'],
+ destination_table.timePartitioning)
+ dofn.bq_wrapper.get_table.assert_called_once()
Review Comment:
To make the test more robust, consider asserting the `get_table()` call
arguments (e.g., `assert_called_once_with(project_id=..., dataset_id=...,
table_id=...)`) rather than only the call count. This helps catch regressions
where the lookup happens against the wrong table reference.
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -82,6 +82,32 @@
_SLEEP_DURATION_BETWEEN_POLLS = 10
+def _has_partitioning_load_parameters(additional_parameters):
+ return (
+ 'timePartitioning' in additional_parameters or
+ 'rangePartitioning' in additional_parameters)
+
+
+def _add_destination_partitioning_load_parameters(
+ additional_parameters, destination_table):
+ if not isinstance(destination_table, bigquery_tools.bigquery.Table):
+ return additional_parameters
+
+ additional_parameters = dict(additional_parameters)
+
+ if ('timePartitioning' not in additional_parameters and
+ getattr(destination_table, 'timePartitioning', None) is not None):
+ additional_parameters['timePartitioning'] = (
+ destination_table.timePartitioning)
+
+ if ('rangePartitioning' not in additional_parameters and
+ getattr(destination_table, 'rangePartitioning', None) is not None):
+ additional_parameters['rangePartitioning'] = (
+ destination_table.rangePartitioning)
+
+ return additional_parameters
Review Comment:
The strict `isinstance(destination_table, bigquery_tools.bigquery.Table)`
guard can prevent partitioning propagation if `get_table()` returns a
compatible table object from a different BigQuery client/module (even though
the code already uses `getattr(...)`). Consider removing the `isinstance` check
(duck-typing on the needed attributes), or broadening it to accept the actual
table type(s) returned by `BigQueryWrapper.get_table()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]