This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b4be68e2404 [Python BQ] Substitute final destination schema when no 
input schema is specified (#30015)
b4be68e2404 is described below

commit b4be68e2404e8f87c545f81800a7e83cd9c77df7
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jan 19 16:25:27 2024 -0500

    [Python BQ] Substitute final destination schema when no input schema is 
specified (#30015)
    
    * substitute final destination's schema; tests
    
    * use cache and only fetch when necessary
---
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 24 +++++++++
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 62 ++++++++++++++++++++++
 2 files changed, 86 insertions(+)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 453cd27dfda..48f2ab4b36b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -656,6 +656,7 @@ class TriggerLoadJobs(beam.DoFn):
     if not self.bq_io_metadata:
       self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
     self.pending_jobs = []
+    self.schema_cache = {}
 
   def process(
       self,
@@ -703,6 +704,29 @@ class TriggerLoadJobs(beam.DoFn):
 
     create_disposition = self.create_disposition
     if self.temporary_tables:
+      # we need to create temp tables, so we need a schema.
+      # if there is no input schema, fetch the destination table's schema
+      if schema is None:
+        hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
+        if hashed_dest in self.schema_cache:
+          schema = self.schema_cache[hashed_dest]
+        else:
+          try:
+            schema = bigquery_tools.table_schema_to_dict(
+                bigquery_tools.BigQueryWrapper().get_table(
+                    project_id=table_reference.projectId,
+                    dataset_id=table_reference.datasetId,
+                    table_id=table_reference.tableId).schema)
+            self.schema_cache[hashed_dest] = schema
+          except Exception as e:
+            _LOGGER.warning(
+                "Input schema is absent and could not fetch the final "
+                "destination table's schema [%s]. Creating temp table [%s] "
+                "will likely fail: %s",
+                hashed_dest,
+                job_name,
+                e)
+
       # If we are using temporary tables, then we must always create the
       # temporary tables, so we replace the create_disposition.
       create_disposition = 'CREATE_IF_NEEDED'
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index edd92f21e73..345c8e70500 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -903,6 +903,68 @@ class BigQueryFileLoadsIT(unittest.TestCase):
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
+  @pytest.mark.it_postcommit
+  def test_batch_copy_jobs_with_no_input_schema(self):
+    schema_1 = "col_1:INTEGER"
+    schema_2 = "col_2:INTEGER"
+
+    # create two tables with different schemas
+    # test to make sure this works with dynamic destinations too
+    self.bigquery_client.get_or_create_table(
+        project_id=self.project,
+        dataset_id=self.dataset_id,
+        table_id="output_table_1",
+        schema=bigquery_tools.get_table_schema_from_string(schema_1),
+        create_disposition='CREATE_IF_NEEDED',
+        write_disposition='WRITE_APPEND')
+    self.bigquery_client.get_or_create_table(
+        project_id=self.project,
+        dataset_id=self.dataset_id,
+        table_id="output_table_2",
+        schema=bigquery_tools.get_table_schema_from_string(schema_2),
+        create_disposition='CREATE_IF_NEEDED',
+        write_disposition='WRITE_APPEND')
+
+    # reduce load job size to induce copy jobs
+    bqfl._DEFAULT_MAX_FILE_SIZE = 10
+    bqfl._MAXIMUM_LOAD_SIZE = 20
+    verifiers = [
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % (self.output_table + "_1"),
+            data=[(i, ) for i in range(5)]),
+        BigqueryFullResultMatcher(
+            project=self.project,
+            query="SELECT * FROM %s" % (self.output_table + "_2"),
+            data=[(i, ) for i in range(5, 10)])
+    ]
+
+    output = self.output_table
+
+    def callable_table(el: dict):
+      dest = output
+      if "col_1" in el:
+        dest += "_1"
+      elif "col_2" in el:
+        dest += "_2"
+      return dest
+
+    args = self.test_pipeline.get_full_options_as_args(
+        on_success_matcher=all_of(verifiers))
+
+    with beam.Pipeline(argv=args) as p:
+      # 0...4 going to table 1
+      # 5...9 going to table 2
+      items = [{"col_1": i} for i in range(5)]
+      items.extend([{"col_2": i} for i in range(5, 10)])
+      _ = (
+          p | beam.Create(items) | bigquery.WriteToBigQuery(
+              table=callable_table,
+              create_disposition="CREATE_NEVER",
+              write_disposition="WRITE_APPEND"))
+
+    hamcrest_assert(p, all_of(*verifiers))
+
   @pytest.mark.it_postcommit
   def test_multiple_destinations_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)

Reply via email to