stankiewicz commented on code in PR #38983:
URL: https://github.com/apache/beam/pull/38983#discussion_r3421928736


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
       self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
     if isinstance(element_list, tuple):
       # Allow this for streaming update compatibility while fixing BEAM-24535.
-      self.process_one(element_list, job_name_prefix)
-    else:
-      for element in element_list:
-        self.process_one(element, job_name_prefix)
+      element_list = [element_list]
 
-  def process_one(self, element, job_name_prefix):
-    destination, job_reference = element
+    if not element_list:
+      return
 
-    copy_to_reference = bigquery_tools.parse_table_reference(destination)
+    first_destination = element_list[0][0]
+    copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
     if copy_to_reference.projectId is None:
       copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '') or self.project
 
-    copy_from_reference = bigquery_tools.parse_table_reference(destination)
-    copy_from_reference.tableId = job_reference.jobId
-    if copy_from_reference.projectId is None:
-      copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
-          'project', str, '') or self.project
-
-    _LOGGER.info(
-        "Triggering copy job from %s to %s",
-        copy_from_reference,
-        copy_to_reference)
+    copy_from_references = []
+    for destination, job_reference in element_list:
+      copy_from_reference = bigquery_tools.parse_table_reference(destination)
+      copy_from_reference.tableId = job_reference.jobId
+      if copy_from_reference.projectId is None:
+        copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+            'project', str, '') or self.project
+      copy_from_references.append(copy_from_reference)
 
-    wait_for_job, write_disposition = (
-      self._determine_write_disposition(copy_to_reference))
-
-    if not self.bq_io_metadata:
-      self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    full_table_ref = '%s:%s.%s' % (
+        copy_to_reference.projectId,
+        copy_to_reference.datasetId,
+        copy_to_reference.tableId)
 
-    project_id = (
-        copy_to_reference.projectId
-        if self.load_job_project_id is None else self.load_job_project_id)
-    copy_job_name = '%s_%s' % (
+    is_first_time = full_table_ref not in self._observed_tables
+    if is_first_time:
+      self._observed_tables.add(full_table_ref)
+      if self.bq_io_metadata:
+        Lineage.sinks().add(
+            'bigquery',
+            copy_to_reference.projectId,
+            copy_to_reference.datasetId,
+            copy_to_reference.tableId)
+
+    # Split into chunks of MAX_SOURCES_PER_COPY_JOB
+    chunks = [
+        copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
+        for i in range(
+            0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
+    ]
+
+    copy_job_name_base = '%s_%s' % (
         job_name_prefix,
         _bq_uuid(
             '%s:%s.%s' % (
-                copy_from_reference.projectId,
-                copy_from_reference.datasetId,
-                copy_from_reference.tableId)))
-    job_reference = self.bq_wrapper._insert_copy_job(
-        project_id,
-        copy_job_name,
-        copy_from_reference,
-        copy_to_reference,
-        create_disposition=self.create_disposition,
-        write_disposition=write_disposition,
-        job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
-
-    if wait_for_job:
-      self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
-    self.pending_jobs.append(
-        GlobalWindows.windowed_value((destination, job_reference)))
+                copy_to_reference.projectId,
+                copy_to_reference.datasetId,
+                copy_to_reference.tableId)))
 
-  def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, 
str]:
-    """
-    Determines the write disposition for a BigQuery copy job,
-     based on destination.
-
-    When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
-    to the same destination can interfere with each other, truncate data, and
-    write to the BigQuery table repeatedly. To prevent this, the first copy job
-    runs with the user's specified write_disposition, but subsequent jobs must
-    always use WRITE_APPEND. This ensures that subsequent copy jobs do not
-    clear out data appended by previous jobs.
-
-    Args:
-        copy_to_reference: The reference to the destination table.
+    project_id = (
+        copy_to_reference.projectId
+        if self.load_job_project_id is None else self.load_job_project_id)
 
-    Returns:
-        A tuple containing a boolean indicating whether to wait for the job to
-        complete and the write disposition to use for the job.
-    """
-    full_table_ref = '%s:%s.%s' % (
-        copy_to_reference.projectId,
-        copy_to_reference.datasetId,
-        copy_to_reference.tableId)
-    if full_table_ref not in self._observed_tables:
-      write_disposition = self.write_disposition
-      wait_for_job = True
-      self._observed_tables.add(full_table_ref)
-      Lineage.sinks().add(
-          'bigquery',
-          copy_to_reference.projectId,
-          copy_to_reference.datasetId,
-          copy_to_reference.tableId)
-    else:
-      wait_for_job = False
-      write_disposition = 'WRITE_APPEND'
-    return wait_for_job, write_disposition
+    for i, chunk in enumerate(chunks):
+      if i == 0 and is_first_time:
+        write_disposition = self.write_disposition
+        # Wait inline only if we have multiple chunks and write disposition is 
WRITE_TRUNCATE or WRITE_EMPTY.
+        # This ensures the first chunk initializes the table, and subsequent 
chunks (WRITE_APPEND) append to it.
+        wait_for_job = (
+            self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and
+            len(chunks) > 1)

Review Comment:
   there are guarantees upstream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to