ahmedabu98 commented on code in PR #38983:
URL: https://github.com/apache/beam/pull/38983#discussion_r3423211685


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -491,6 +491,7 @@ class TriggerCopyJobs(beam.DoFn):
   """
 
   TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
+  MAX_SOURCES_PER_COPY_JOB = 1200

Review Comment:
   Add a comment referring to the quota doc 
https://docs.cloud.google.com/bigquery/quotas#copy_jobs?



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
       self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
     if isinstance(element_list, tuple):
       # Allow this for streaming update compatibility while fixing BEAM-24535.
-      self.process_one(element_list, job_name_prefix)
-    else:
-      for element in element_list:
-        self.process_one(element, job_name_prefix)
+      element_list = [element_list]
 
-  def process_one(self, element, job_name_prefix):
-    destination, job_reference = element
+    if not element_list:
+      return
 
-    copy_to_reference = bigquery_tools.parse_table_reference(destination)
+    first_destination = element_list[0][0]
+    copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
     if copy_to_reference.projectId is None:
       copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '') or self.project
 
-    copy_from_reference = bigquery_tools.parse_table_reference(destination)
-    copy_from_reference.tableId = job_reference.jobId
-    if copy_from_reference.projectId is None:
-      copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
-          'project', str, '') or self.project
-
-    _LOGGER.info(
-        "Triggering copy job from %s to %s",
-        copy_from_reference,
-        copy_to_reference)
+    copy_from_references = []
+    for destination, job_reference in element_list:
+      copy_from_reference = bigquery_tools.parse_table_reference(destination)
+      copy_from_reference.tableId = job_reference.jobId
+      if copy_from_reference.projectId is None:
+        copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+            'project', str, '') or self.project
+      copy_from_references.append(copy_from_reference)
 
-    wait_for_job, write_disposition = (
-      self._determine_write_disposition(copy_to_reference))
-
-    if not self.bq_io_metadata:
-      self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    full_table_ref = '%s:%s.%s' % (
+        copy_to_reference.projectId,
+        copy_to_reference.datasetId,
+        copy_to_reference.tableId)
 
-    project_id = (
-        copy_to_reference.projectId
-        if self.load_job_project_id is None else self.load_job_project_id)
-    copy_job_name = '%s_%s' % (
+    is_first_time = full_table_ref not in self._observed_tables
+    if is_first_time:
+      self._observed_tables.add(full_table_ref)
+      if self.bq_io_metadata:
+        Lineage.sinks().add(
+            'bigquery',
+            copy_to_reference.projectId,
+            copy_to_reference.datasetId,
+            copy_to_reference.tableId)

Review Comment:
   should we only track it in `_observed_tables` it at the end of the process 
call, after the copy job completes successfully?



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
       self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
     if isinstance(element_list, tuple):
       # Allow this for streaming update compatibility while fixing BEAM-24535.
-      self.process_one(element_list, job_name_prefix)
-    else:
-      for element in element_list:
-        self.process_one(element, job_name_prefix)
+      element_list = [element_list]
 
-  def process_one(self, element, job_name_prefix):
-    destination, job_reference = element
+    if not element_list:
+      return
 
-    copy_to_reference = bigquery_tools.parse_table_reference(destination)
+    first_destination = element_list[0][0]
+    copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
     if copy_to_reference.projectId is None:
       copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '') or self.project
 
-    copy_from_reference = bigquery_tools.parse_table_reference(destination)
-    copy_from_reference.tableId = job_reference.jobId
-    if copy_from_reference.projectId is None:
-      copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
-          'project', str, '') or self.project
-
-    _LOGGER.info(
-        "Triggering copy job from %s to %s",
-        copy_from_reference,
-        copy_to_reference)
+    copy_from_references = []
+    for destination, job_reference in element_list:
+      copy_from_reference = bigquery_tools.parse_table_reference(destination)
+      copy_from_reference.tableId = job_reference.jobId
+      if copy_from_reference.projectId is None:
+        copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+            'project', str, '') or self.project
+      copy_from_references.append(copy_from_reference)
 
-    wait_for_job, write_disposition = (
-      self._determine_write_disposition(copy_to_reference))
-
-    if not self.bq_io_metadata:
-      self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    full_table_ref = '%s:%s.%s' % (
+        copy_to_reference.projectId,
+        copy_to_reference.datasetId,
+        copy_to_reference.tableId)

Review Comment:
   nit: maybe use `bigquery_tools.get_hashable_destination(copy_to_reference)`



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -528,96 +529,97 @@ def process(
       self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
     if isinstance(element_list, tuple):
       # Allow this for streaming update compatibility while fixing BEAM-24535.
-      self.process_one(element_list, job_name_prefix)
-    else:
-      for element in element_list:
-        self.process_one(element, job_name_prefix)
+      element_list = [element_list]
 
-  def process_one(self, element, job_name_prefix):
-    destination, job_reference = element
+    if not element_list:
+      return
 
-    copy_to_reference = bigquery_tools.parse_table_reference(destination)
+    first_destination = element_list[0][0]
+    copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
     if copy_to_reference.projectId is None:
       copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
           'project', str, '') or self.project
 
-    copy_from_reference = bigquery_tools.parse_table_reference(destination)
-    copy_from_reference.tableId = job_reference.jobId
-    if copy_from_reference.projectId is None:
-      copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
-          'project', str, '') or self.project
-
-    _LOGGER.info(
-        "Triggering copy job from %s to %s",
-        copy_from_reference,
-        copy_to_reference)
+    copy_from_references = []
+    for destination, job_reference in element_list:
+      copy_from_reference = bigquery_tools.parse_table_reference(destination)
+      copy_from_reference.tableId = job_reference.jobId
+      if copy_from_reference.projectId is None:
+        copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
+            'project', str, '') or self.project
+      copy_from_references.append(copy_from_reference)
 
-    wait_for_job, write_disposition = (
-      self._determine_write_disposition(copy_to_reference))
-
-    if not self.bq_io_metadata:
-      self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    full_table_ref = '%s:%s.%s' % (
+        copy_to_reference.projectId,
+        copy_to_reference.datasetId,
+        copy_to_reference.tableId)
 
-    project_id = (
-        copy_to_reference.projectId
-        if self.load_job_project_id is None else self.load_job_project_id)
-    copy_job_name = '%s_%s' % (
+    is_first_time = full_table_ref not in self._observed_tables
+    if is_first_time:
+      self._observed_tables.add(full_table_ref)
+      if self.bq_io_metadata:
+        Lineage.sinks().add(
+            'bigquery',
+            copy_to_reference.projectId,
+            copy_to_reference.datasetId,
+            copy_to_reference.tableId)
+
+    # Split into chunks of MAX_SOURCES_PER_COPY_JOB
+    chunks = [
+        copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
+        for i in range(
+            0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
+    ]
+
+    copy_job_name_base = '%s_%s' % (
         job_name_prefix,
         _bq_uuid(
             '%s:%s.%s' % (
-                copy_from_reference.projectId,
-                copy_from_reference.datasetId,
-                copy_from_reference.tableId)))
-    job_reference = self.bq_wrapper._insert_copy_job(
-        project_id,
-        copy_job_name,
-        copy_from_reference,
-        copy_to_reference,
-        create_disposition=self.create_disposition,
-        write_disposition=write_disposition,
-        job_labels=self.bq_io_metadata.add_additional_bq_job_labels())
-
-    if wait_for_job:
-      self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
-    self.pending_jobs.append(
-        GlobalWindows.windowed_value((destination, job_reference)))
+                copy_to_reference.projectId,
+                copy_to_reference.datasetId,
+                copy_to_reference.tableId)))

Review Comment:
   nit: use `full_table_ref` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to