This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b4be68e2404 [Python BQ] Substitute final destination schema when no
input schema is specified (#30015)
b4be68e2404 is described below
commit b4be68e2404e8f87c545f81800a7e83cd9c77df7
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jan 19 16:25:27 2024 -0500
[Python BQ] Substitute final destination schema when no input schema is
specified (#30015)
* substitute final destination's schema; tests
* use cache and only fetch when necessary
---
.../apache_beam/io/gcp/bigquery_file_loads.py | 24 +++++++++
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 62 ++++++++++++++++++++++
2 files changed, 86 insertions(+)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 453cd27dfda..48f2ab4b36b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -656,6 +656,7 @@ class TriggerLoadJobs(beam.DoFn):
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
self.pending_jobs = []
+ self.schema_cache = {}
def process(
self,
@@ -703,6 +704,29 @@ class TriggerLoadJobs(beam.DoFn):
create_disposition = self.create_disposition
if self.temporary_tables:
+ # we need to create temp tables, so we need a schema.
+ # if there is no input schema, fetch the destination table's schema
+ if schema is None:
+ hashed_dest = bigquery_tools.get_hashable_destination(table_reference)
+ if hashed_dest in self.schema_cache:
+ schema = self.schema_cache[hashed_dest]
+ else:
+ try:
+ schema = bigquery_tools.table_schema_to_dict(
+ bigquery_tools.BigQueryWrapper().get_table(
+ project_id=table_reference.projectId,
+ dataset_id=table_reference.datasetId,
+ table_id=table_reference.tableId).schema)
+ self.schema_cache[hashed_dest] = schema
+ except Exception as e:
+ _LOGGER.warning(
+ "Input schema is absent and could not fetch the final "
+ "destination table's schema [%s]. Creating temp table [%s] "
+ "will likely fail: %s",
+ hashed_dest,
+ job_name,
+ e)
+
# If we are using temporary tables, then we must always create the
# temporary tables, so we replace the create_disposition.
create_disposition = 'CREATE_IF_NEEDED'
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index edd92f21e73..345c8e70500 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -903,6 +903,68 @@ class BigQueryFileLoadsIT(unittest.TestCase):
_LOGGER.info(
"Created dataset %s in project %s", self.dataset_id, self.project)
+ @pytest.mark.it_postcommit
+ def test_batch_copy_jobs_with_no_input_schema(self):
+ schema_1 = "col_1:INTEGER"
+ schema_2 = "col_2:INTEGER"
+
+ # create two tables with different schemas
+ # test to make sure this works with dynamic destinations too
+ self.bigquery_client.get_or_create_table(
+ project_id=self.project,
+ dataset_id=self.dataset_id,
+ table_id="output_table_1",
+ schema=bigquery_tools.get_table_schema_from_string(schema_1),
+ create_disposition='CREATE_IF_NEEDED',
+ write_disposition='WRITE_APPEND')
+ self.bigquery_client.get_or_create_table(
+ project_id=self.project,
+ dataset_id=self.dataset_id,
+ table_id="output_table_2",
+ schema=bigquery_tools.get_table_schema_from_string(schema_2),
+ create_disposition='CREATE_IF_NEEDED',
+ write_disposition='WRITE_APPEND')
+
+ # reduce load job size to induce copy jobs
+ bqfl._DEFAULT_MAX_FILE_SIZE = 10
+ bqfl._MAXIMUM_LOAD_SIZE = 20
+ verifiers = [
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM %s" % (self.output_table + "_1"),
+ data=[(i, ) for i in range(5)]),
+ BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM %s" % (self.output_table + "_2"),
+ data=[(i, ) for i in range(5, 10)])
+ ]
+
+ output = self.output_table
+
+ def callable_table(el: dict):
+ dest = output
+ if "col_1" in el:
+ dest += "_1"
+ elif "col_2" in el:
+ dest += "_2"
+ return dest
+
+ args = self.test_pipeline.get_full_options_as_args(
+ on_success_matcher=all_of(verifiers))
+
+ with beam.Pipeline(argv=args) as p:
+ # 0...4 going to table 1
+ # 5...9 going to table 2
+ items = [{"col_1": i} for i in range(5)]
+ items.extend([{"col_2": i} for i in range(5, 10)])
+ _ = (
+ p | beam.Create(items) | bigquery.WriteToBigQuery(
+ table=callable_table,
+ create_disposition="CREATE_NEVER",
+ write_disposition="WRITE_APPEND"))
+
+ hamcrest_assert(p, all_of(*verifiers))
+
@pytest.mark.it_postcommit
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)