Copilot commented on code in PR #38833:
URL: https://github.com/apache/beam/pull/38833#discussion_r3369844097


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -735,19 +762,37 @@ def process(
 
     create_disposition = self.create_disposition
     if self.temporary_tables:
+      destination_table = None
+      hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
+      should_lookup_destination_table = (
+          schema is None or
+          not _has_partitioning_load_parameters(additional_parameters))
+      if should_lookup_destination_table:
+        try:
+          destination_table = self.bq_wrapper.get_table(
+              project_id=table_reference.projectId,
+              dataset_id=table_reference.datasetId,
+              table_id=table_reference.tableId)
+        except Exception as e:

Review Comment:
   When `temporary_tables=True` and a schema is already provided, this will 
still call `get_table()` for every element unless partitioning parameters are 
explicitly set. For pipelines producing many load jobs per destination, this 
adds repeated BigQuery API calls and can become a bottleneck. Consider caching 
destination metadata (e.g., a `destination_table_cache` keyed by `hashed_dest` 
in `start_bundle`, or storing partitioning info alongside `schema_cache`) so 
each destination is looked up at most once per bundle.



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py:
##########
@@ -703,6 +703,73 @@ def test_one_load_job_failed_after_waiting(self, 
sleep_mock):
 
     sleep_mock.assert_called_once()
 
+  def test_temporary_table_load_inherits_destination_time_partitioning(self):
+    destination = 'project1:dataset1.table1'
+    partition = (destination, (0, ['gs://bucket/file1']))
+    job_reference = bigquery_api.JobReference(
+        projectId='project1', jobId='job_name1')
+    destination_table = bigquery_api.Table(
+        timePartitioning=bigquery_api.TimePartitioning(type='DAY'))
+
+    dofn = bqfl.TriggerLoadJobs(
+        schema=_ELEMENTS_SCHEMA, test_client=mock.Mock(), 
temporary_tables=True)
+    dofn.start_bundle()
+    dofn.bq_wrapper.get_table = mock.Mock(return_value=destination_table)
+    dofn.bq_wrapper.perform_load_job = mock.Mock(return_value=job_reference)
+
+    list(dofn.process(partition, 'test_job', pane_info=mock.Mock(index=0)))
+
+    load_call = dofn.bq_wrapper.perform_load_job.call_args.kwargs
+    self.assertEqual(
+        load_call['additional_load_parameters']['timePartitioning'],
+        destination_table.timePartitioning)
+    dofn.bq_wrapper.get_table.assert_called_once()

Review Comment:
   To make the test more robust, consider asserting the `get_table()` call 
arguments (e.g., `assert_called_once_with(project_id=..., dataset_id=..., 
table_id=...)`) rather than only the call count. This helps catch regressions 
where the lookup happens against the wrong table reference.



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -82,6 +82,32 @@
 _SLEEP_DURATION_BETWEEN_POLLS = 10
 
 
+def _has_partitioning_load_parameters(additional_parameters):
+  return (
+      'timePartitioning' in additional_parameters or
+      'rangePartitioning' in additional_parameters)
+
+
+def _add_destination_partitioning_load_parameters(
+    additional_parameters, destination_table):
+  if not isinstance(destination_table, bigquery_tools.bigquery.Table):
+    return additional_parameters
+
+  additional_parameters = dict(additional_parameters)
+
+  if ('timePartitioning' not in additional_parameters and
+      getattr(destination_table, 'timePartitioning', None) is not None):
+    additional_parameters['timePartitioning'] = (
+        destination_table.timePartitioning)
+
+  if ('rangePartitioning' not in additional_parameters and
+      getattr(destination_table, 'rangePartitioning', None) is not None):
+    additional_parameters['rangePartitioning'] = (
+        destination_table.rangePartitioning)
+
+  return additional_parameters

Review Comment:
   The strict `isinstance(destination_table, bigquery_tools.bigquery.Table)` 
guard can prevent partitioning propagation if `get_table()` returns a 
compatible table object from a different BigQuery client/module (even though 
the code already uses `getattr(...)`). Consider removing the `isinstance` check 
(duck-typing on the needed attributes), or broadening it to accept the actual 
table type(s) returned by `BigQueryWrapper.get_table()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to