gemini-code-assist[bot] commented on code in PR #38983:
URL: https://github.com/apache/beam/pull/38983#discussion_r3420927319


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
       self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
     if isinstance(element_list, tuple):
       # Allow this for streaming update compatibility while fixing BEAM-24535.
-      self.process_one(element_list, job_name_prefix)
-    else:
-      for element in element_list:
-        self.process_one(element, job_name_prefix)
+      element_list = [element_list]
 
-  def process_one(self, element, job_name_prefix):
-    destination, job_reference = element
+    if not element_list:
+      return
 

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The initialization of `self.bq_io_metadata` has been removed from the 
processing path. If `bq_io_metadata` is not explicitly passed to 
`TriggerCopyJobs` during initialization (which is common in many pipelines and 
tests), it will remain `None`. This leads to a silent loss of lineage tracking 
and BigQuery job labels (such as step name, which is crucial for monitoring and 
billing).
   
   Please restore the dynamic initialization of `self.bq_io_metadata` if it is 
not already set.
   
   ```suggestion
       if not element_list:
         return
   
       if not self.bq_io_metadata:
         self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
   ```



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -506,16 +506,22 @@ def _insert_copy_job(
     reference = bigquery.JobReference()
     reference.jobId = job_id
     reference.projectId = project_id
+
+    copy_config = bigquery.JobConfigurationTableCopy(
+        destinationTable=to_table_reference,
+        createDisposition=create_disposition,
+        writeDisposition=write_disposition,
+    )
+    if isinstance(from_table_reference, list):
+      copy_config.sourceTables = from_table_reference
+    else:
+      copy_config.sourceTable = from_table_reference

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   With the introduction of multi-source copy jobs using `sourceTables` 
(plural) instead of `sourceTable` (singular), the `TriggerDeleteTempTables` 
DoFn in `bigquery_file_loads.py` must also be updated.
   
   Currently, `TriggerDeleteTempTables` only looks at 
`copy_job.configuration.copy.sourceTable` to find and delete temporary tables. 
If a multi-source copy job is used, `sourceTable` will be `None` (as 
`sourceTables` is populated instead), causing all temporary tables to be leaked 
in BigQuery.
   
   Please update `TriggerDeleteTempTables` to handle both `sourceTable` and 
`sourceTables` to prevent resource leaks.



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
       self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
     if isinstance(element_list, tuple):
       # Allow this for streaming update compatibility while fixing BEAM-24535.
-      self.process_one(element_list, job_name_prefix)
-    else:
-      for element in element_list:
-        self.process_one(element, job_name_prefix)
+      element_list = [element_list]
 
-  def process_one(self, element, job_name_prefix):
-    destination, job_reference = element
+    if not element_list:
+      return
 
-    copy_to_reference = bigquery_tools.parse_table_reference(destination)
+    first_destination = element_list[0][0]
+    copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
     if copy_to_reference.projectId is None:
       copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '') or self.project
 
-    copy_from_reference = bigquery_tools.parse_table_reference(destination)
-    copy_from_reference.tableId = job_reference.jobId
-    if copy_from_reference.projectId is None:
-      copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
-          'project', str, '') or self.project
-
-    _LOGGER.info(
-        "Triggering copy job from %s to %s",
-        copy_from_reference,
-        copy_to_reference)
+    copy_from_references = []
+    for destination, job_reference in element_list:
+      copy_from_reference = bigquery_tools.parse_table_reference(destination)
+      copy_from_reference.tableId = job_reference.jobId
+      if copy_from_reference.projectId is None:
+        copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+            'project', str, '') or self.project
+      copy_from_references.append(copy_from_reference)
 
-    wait_for_job, write_disposition = (
-      self._determine_write_disposition(copy_to_reference))
-
-    if not self.bq_io_metadata:
-      self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    full_table_ref = '%s:%s.%s' % (
+        copy_to_reference.projectId,
+        copy_to_reference.datasetId,
+        copy_to_reference.tableId)
 
-    project_id = (
-        copy_to_reference.projectId
-        if self.load_job_project_id is None else self.load_job_project_id)
-    copy_job_name = '%s_%s' % (
+    is_first_time = full_table_ref not in self._observed_tables
+    if is_first_time:
+      self._observed_tables.add(full_table_ref)
+      if self.bq_io_metadata:
+        Lineage.sinks().add(
+            'bigquery',
+            copy_to_reference.projectId,
+            copy_to_reference.datasetId,
+            copy_to_reference.tableId)
+
+    # Split into chunks of MAX_SOURCES_PER_COPY_JOB
+    chunks = [
+        copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
+        for i in range(
+            0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
+    ]
+
+    copy_job_name_base = '%s_%s' % (
         job_name_prefix,
         _bq_uuid(
             '%s:%s.%s' % (
-                copy_from_reference.projectId,
-                copy_from_reference.datasetId,
-                copy_from_reference.tableId)))
-    job_reference = self.bq_wrapper._insert_copy_job(
-        project_id,
-        copy_job_name,
-        copy_from_reference,
-        copy_to_reference,
-        create_disposition=self.create_disposition,
-        write_disposition=write_disposition,
-        job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
-
-    if wait_for_job:
-      self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
-    self.pending_jobs.append(
-        GlobalWindows.windowed_value((destination, job_reference)))
+                copy_to_reference.projectId,
+                copy_to_reference.datasetId,
+                copy_to_reference.tableId)))
 
-  def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, 
str]:
-    """
-    Determines the write disposition for a BigQuery copy job,
-     based on destination.
-
-    When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
-    to the same destination can interfere with each other, truncate data, and
-    write to the BigQuery table repeatedly. To prevent this, the first copy job
-    runs with the user's specified write_disposition, but subsequent jobs must
-    always use WRITE_APPEND. This ensures that subsequent copy jobs do not
-    clear out data appended by previous jobs.
-
-    Args:
-        copy_to_reference: The reference to the destination table.
+    project_id = (
+        copy_to_reference.projectId
+        if self.load_job_project_id is None else self.load_job_project_id)
 
-    Returns:
-        A tuple containing a boolean indicating whether to wait for the job to
-        complete and the write disposition to use for the job.
-    """
-    full_table_ref = '%s:%s.%s' % (
-        copy_to_reference.projectId,
-        copy_to_reference.datasetId,
-        copy_to_reference.tableId)
-    if full_table_ref not in self._observed_tables:
-      write_disposition = self.write_disposition
-      wait_for_job = True
-      self._observed_tables.add(full_table_ref)
-      Lineage.sinks().add(
-          'bigquery',
-          copy_to_reference.projectId,
-          copy_to_reference.datasetId,
-          copy_to_reference.tableId)
-    else:
-      wait_for_job = False
-      write_disposition = 'WRITE_APPEND'
-    return wait_for_job, write_disposition
+    for i, chunk in enumerate(chunks):
+      if i == 0 and is_first_time:
+        write_disposition = self.write_disposition
+        # Wait inline only if we have multiple chunks and write disposition is 
WRITE_TRUNCATE or WRITE_EMPTY.
+        # This ensures the first chunk initializes the table, and subsequent 
chunks (WRITE_APPEND) append to it.
+        wait_for_job = (
+            self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and
+            len(chunks) > 1)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current logic only waits inline for the first copy job if there are 
multiple chunks (`len(chunks) > 1`). However, if `len(chunks) == 1` and the 
write disposition is `WRITE_TRUNCATE` or `WRITE_EMPTY`, the copy job is 
triggered asynchronously without waiting inline.
   
   If a subsequent bundle or another element in the pipeline triggers a 
`WRITE_APPEND` copy job to the same destination, it can run concurrently with 
or even finish before the `WRITE_TRUNCATE` job. This creates a critical race 
condition where the truncate job can wipe out the appended data, leading to 
silent data loss.
   
   To ensure correctness across bundles and chunks, we must always wait inline 
for the first copy job to complete if the write disposition is `WRITE_TRUNCATE` 
or `WRITE_EMPTY`, regardless of the number of chunks.
   
   ```python
         if i == 0 and is_first_time:
           write_disposition = self.write_disposition
           # Wait inline if write disposition is WRITE_TRUNCATE or WRITE_EMPTY.
           # This ensures the first chunk initializes the table before 
subsequent chunks or bundles append to it.
           wait_for_job = self.write_disposition in ('WRITE_TRUNCATE', 
'WRITE_EMPTY')
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to