gemini-code-assist[bot] commented on code in PR #38983:
URL: https://github.com/apache/beam/pull/38983#discussion_r3420927319
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
if isinstance(element_list, tuple):
# Allow this for streaming update compatibility while fixing BEAM-24535.
- self.process_one(element_list, job_name_prefix)
- else:
- for element in element_list:
- self.process_one(element, job_name_prefix)
+ element_list = [element_list]
- def process_one(self, element, job_name_prefix):
- destination, job_reference = element
+ if not element_list:
+ return
Review Comment:

The initialization of `self.bq_io_metadata` has been removed from the
processing path. If `bq_io_metadata` is not explicitly passed to
`TriggerCopyJobs` during initialization (which is common in many pipelines and
tests), it will remain `None`. This leads to a silent loss of lineage tracking
and BigQuery job labels (such as step name, which is crucial for monitoring and
billing).
Please restore the dynamic initialization of `self.bq_io_metadata` if it is
not already set.
```suggestion
if not element_list:
return
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
```
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -506,16 +506,22 @@ def _insert_copy_job(
reference = bigquery.JobReference()
reference.jobId = job_id
reference.projectId = project_id
+
+ copy_config = bigquery.JobConfigurationTableCopy(
+ destinationTable=to_table_reference,
+ createDisposition=create_disposition,
+ writeDisposition=write_disposition,
+ )
+ if isinstance(from_table_reference, list):
+ copy_config.sourceTables = from_table_reference
+ else:
+ copy_config.sourceTable = from_table_reference
Review Comment:

With the introduction of multi-source copy jobs using `sourceTables`
(plural) instead of `sourceTable` (singular), the `TriggerDeleteTempTables`
DoFn in `bigquery_file_loads.py` must also be updated.
Currently, `TriggerDeleteTempTables` only looks at
`copy_job.configuration.copy.sourceTable` to find and delete temporary tables.
If a multi-source copy job is used, `sourceTable` will be `None` (as
`sourceTables` is populated instead), causing all temporary tables to be leaked
in BigQuery.
Please update `TriggerDeleteTempTables` to handle both `sourceTable` and
`sourceTables` to prevent resource leaks.
##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
if isinstance(element_list, tuple):
# Allow this for streaming update compatibility while fixing BEAM-24535.
- self.process_one(element_list, job_name_prefix)
- else:
- for element in element_list:
- self.process_one(element, job_name_prefix)
+ element_list = [element_list]
- def process_one(self, element, job_name_prefix):
- destination, job_reference = element
+ if not element_list:
+ return
- copy_to_reference = bigquery_tools.parse_table_reference(destination)
+ first_destination = element_list[0][0]
+ copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
if copy_to_reference.projectId is None:
copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project
- copy_from_reference = bigquery_tools.parse_table_reference(destination)
- copy_from_reference.tableId = job_reference.jobId
- if copy_from_reference.projectId is None:
- copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
- 'project', str, '') or self.project
-
- _LOGGER.info(
- "Triggering copy job from %s to %s",
- copy_from_reference,
- copy_to_reference)
+ copy_from_references = []
+ for destination, job_reference in element_list:
+ copy_from_reference = bigquery_tools.parse_table_reference(destination)
+ copy_from_reference.tableId = job_reference.jobId
+ if copy_from_reference.projectId is None:
+ copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+ 'project', str, '') or self.project
+ copy_from_references.append(copy_from_reference)
- wait_for_job, write_disposition = (
- self._determine_write_disposition(copy_to_reference))
-
- if not self.bq_io_metadata:
- self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ full_table_ref = '%s:%s.%s' % (
+ copy_to_reference.projectId,
+ copy_to_reference.datasetId,
+ copy_to_reference.tableId)
- project_id = (
- copy_to_reference.projectId
- if self.load_job_project_id is None else self.load_job_project_id)
- copy_job_name = '%s_%s' % (
+ is_first_time = full_table_ref not in self._observed_tables
+ if is_first_time:
+ self._observed_tables.add(full_table_ref)
+ if self.bq_io_metadata:
+ Lineage.sinks().add(
+ 'bigquery',
+ copy_to_reference.projectId,
+ copy_to_reference.datasetId,
+ copy_to_reference.tableId)
+
+ # Split into chunks of MAX_SOURCES_PER_COPY_JOB
+ chunks = [
+ copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
+ for i in range(
+ 0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
+ ]
+
+ copy_job_name_base = '%s_%s' % (
job_name_prefix,
_bq_uuid(
'%s:%s.%s' % (
- copy_from_reference.projectId,
- copy_from_reference.datasetId,
- copy_from_reference.tableId)))
- job_reference = self.bq_wrapper._insert_copy_job(
- project_id,
- copy_job_name,
- copy_from_reference,
- copy_to_reference,
- create_disposition=self.create_disposition,
- write_disposition=write_disposition,
- job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
-
- if wait_for_job:
- self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
- self.pending_jobs.append(
- GlobalWindows.windowed_value((destination, job_reference)))
+ copy_to_reference.projectId,
+ copy_to_reference.datasetId,
+ copy_to_reference.tableId)))
- def _determine_write_disposition(self, copy_to_reference) -> tuple[bool,
str]:
- """
- Determines the write disposition for a BigQuery copy job,
- based on destination.
-
- When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
- to the same destination can interfere with each other, truncate data, and
- write to the BigQuery table repeatedly. To prevent this, the first copy job
- runs with the user's specified write_disposition, but subsequent jobs must
- always use WRITE_APPEND. This ensures that subsequent copy jobs do not
- clear out data appended by previous jobs.
-
- Args:
- copy_to_reference: The reference to the destination table.
+ project_id = (
+ copy_to_reference.projectId
+ if self.load_job_project_id is None else self.load_job_project_id)
- Returns:
- A tuple containing a boolean indicating whether to wait for the job to
- complete and the write disposition to use for the job.
- """
- full_table_ref = '%s:%s.%s' % (
- copy_to_reference.projectId,
- copy_to_reference.datasetId,
- copy_to_reference.tableId)
- if full_table_ref not in self._observed_tables:
- write_disposition = self.write_disposition
- wait_for_job = True
- self._observed_tables.add(full_table_ref)
- Lineage.sinks().add(
- 'bigquery',
- copy_to_reference.projectId,
- copy_to_reference.datasetId,
- copy_to_reference.tableId)
- else:
- wait_for_job = False
- write_disposition = 'WRITE_APPEND'
- return wait_for_job, write_disposition
+ for i, chunk in enumerate(chunks):
+ if i == 0 and is_first_time:
+ write_disposition = self.write_disposition
+ # Wait inline only if we have multiple chunks and write disposition is
WRITE_TRUNCATE or WRITE_EMPTY.
+ # This ensures the first chunk initializes the table, and subsequent
chunks (WRITE_APPEND) append to it.
+ wait_for_job = (
+ self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and
+ len(chunks) > 1)
Review Comment:

The current logic only waits inline for the first copy job if there are
multiple chunks (`len(chunks) > 1`). However, if `len(chunks) == 1` and the
write disposition is `WRITE_TRUNCATE` or `WRITE_EMPTY`, the copy job is
triggered asynchronously without waiting inline.
If a subsequent bundle or another element in the pipeline triggers a
`WRITE_APPEND` copy job to the same destination, it can run concurrently with
or even finish before the `WRITE_TRUNCATE` job. This creates a critical race
condition where the truncate job can wipe out the appended data, leading to
silent data loss.
To ensure correctness across bundles and chunks, we must always wait inline
for the first copy job to complete if the write disposition is `WRITE_TRUNCATE`
or `WRITE_EMPTY`, regardless of the number of chunks.
```python
if i == 0 and is_first_time:
write_disposition = self.write_disposition
# Wait inline if write disposition is WRITE_TRUNCATE or WRITE_EMPTY.
# This ensures the first chunk initializes the table before
subsequent chunks or bundles append to it.
wait_for_job = self.write_disposition in ('WRITE_TRUNCATE',
'WRITE_EMPTY')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]