This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3b994d6 Fix issue with update schema source format
new 9ce826e Merge pull request #15237 from [BEAM-12669] Fix issue with
update schema source format
3b994d6 is described below
commit 3b994d63be7d7d408c6c9d782db88df119d5ae13
Author: Sayat Satybaldiyev <[email protected]>
AuthorDate: Tue Jul 27 18:08:22 2021 -0700
Fix issue with update schema source format
---
.../apache_beam/io/gcp/bigquery_file_loads.py | 32 ++++++++++++++++------
.../apache_beam/io/gcp/bigquery_write_it_test.py | 27 +++++++++++++++---
2 files changed, 46 insertions(+), 13 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 57204d4..e211ab4 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -344,16 +344,25 @@ class UpdateDestinationSchema(beam.DoFn):
write_disposition=None,
test_client=None,
additional_bq_parameters=None,
- step_name=None):
+ step_name=None,
+ source_format=None):
self._test_client = test_client
self._write_disposition = write_disposition
self._additional_bq_parameters = additional_bq_parameters or {}
self._step_name = step_name
+ self._source_format = source_format
def setup(self):
self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ def display_data(self):
+ return {
+ 'write_disposition': str(self._write_disposition),
+ 'additional_bq_params': str(self._additional_bq_parameters),
+ 'source_format': str(self._source_format),
+ }
+
def process(self, element, schema_mod_job_name_prefix):
destination = element[0]
temp_table_load_job_reference = element[1]
@@ -415,7 +424,7 @@ class UpdateDestinationSchema(beam.DoFn):
uid = _bq_uuid()
job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid)
- _LOGGER.debug(
+ _LOGGER.info(
'Triggering schema modification job %s on %s',
job_name,
table_reference)
@@ -429,7 +438,8 @@ class UpdateDestinationSchema(beam.DoFn):
write_disposition='WRITE_APPEND',
create_disposition='CREATE_NEVER',
additional_load_parameters=additional_parameters,
- job_labels=self._bq_io_metadata.add_additional_bq_job_labels())
+ job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+ source_format=self._source_format)
yield (destination, schema_update_job_reference)
@@ -573,7 +583,8 @@ class TriggerLoadJobs(beam.DoFn):
'additional_bq_params': str(self.additional_bq_parameters),
'schema': str(self.schema),
'launchesBigQueryJobs': DisplayDataItem(
- True, label="This Dataflow job launches bigquery jobs.")
+ True, label="This Dataflow job launches bigquery jobs."),
+ 'source_format': str(self.source_format),
}
return result
@@ -619,8 +630,7 @@ class TriggerLoadJobs(beam.DoFn):
table_reference.tableId))
uid = _bq_uuid()
job_name = '%s_%s_%s' % (load_job_name_prefix, destination_hash, uid)
- _LOGGER.debug(
- 'Load job has %s files. Job name is %s.', len(files), job_name)
+ _LOGGER.info('Load job has %s files. Job name is %s.', len(files),
job_name)
create_disposition = self.create_disposition
if self.temporary_tables:
@@ -635,11 +645,13 @@ class TriggerLoadJobs(beam.DoFn):
_LOGGER.info(
'Triggering job %s to load data to BigQuery table %s.'
- 'Schema: %s. Additional parameters: %s',
+ 'Schema: %s. Additional parameters: %s. Source format: %s',
job_name,
table_reference,
schema,
- additional_parameters)
+ additional_parameters,
+ self.source_format,
+ )
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
job_reference = self.bq_wrapper.perform_load_job(
@@ -1015,7 +1027,9 @@ class BigQueryBatchFileLoads(beam.PTransform):
write_disposition=self.write_disposition,
test_client=self.test_client,
additional_bq_parameters=self.additional_bq_parameters,
- step_name=step_name),
+ step_name=step_name,
+ source_format=self._temp_file_format,
+ ),
schema_mod_job_name_pcv))
finished_schema_mod_jobs_pc = (
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c7f1d44..3e0c641 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -367,21 +367,40 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
self.create_table(table_name)
table_id = '{}.{}'.format(self.dataset_id, table_name)
- input_data = [{"int64": 1, "bool": True}, {"int64": 2, "bool": False}]
+ input_data = [{
+ "int64": num, "bool": True, "nested_field": {
+ "fruit": "Apple"
+ }
+ } for num in range(1, 3)]
table_schema = {
"fields": [{
"name": "int64", "type": "INT64"
}, {
"name": "bool", "type": "BOOL"
- }]
+ },
+ {
+ "name": "nested_field",
+ "type": "RECORD",
+ "mode": "REPEATED",
+ "fields": [
+ {
+ "name": "fruit",
+ "type": "STRING",
+ "mode": "NULLABLE"
+ },
+ ]
+ }]
}
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=BigqueryFullResultMatcher(
project=self.project,
- query="SELECT bytes, date, time, int64, bool FROM %s" % table_id,
- data=[(None, None, None, 1, True), (None, None, None, 2, False)]))
+ query=
+ "SELECT bytes, date, time, int64, bool, nested_field.fruit FROM %s"
+ % table_id,
+ data=[(None, None, None, num, True, "Apple")
+ for num in range(1, 3)]))
with beam.Pipeline(argv=args) as p:
# pylint: disable=expression-not-assigned