This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6e7f893 [py] Supporting ignore_unknown_values for WriteToBigQuery
new 3b9d1d5 Merge pull request #16081 from [BEAM-9706][py] Supporting
ignore_unknown_values for WriteToBigQuery
6e7f893 is described below
commit 6e7f8938e727068f8712303465fbe87205f24f28
Author: Pablo Estrada <[email protected]>
AuthorDate: Mon Nov 29 12:13:29 2021 -0800
[py] Supporting ignore_unknown_values for WriteToBigQuery
---
sdks/python/apache_beam/io/gcp/bigquery.py | 24 ++++++++++++++++++----
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 17 +++++++++++----
.../apache_beam/io/gcp/bigquery_tools_test.py | 5 +++--
3 files changed, 36 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index f41629b..0d98d7b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1489,7 +1489,8 @@ class BigQueryWriteFn(DoFn):
retry_strategy=None,
additional_bq_parameters=None,
ignore_insert_ids=False,
- with_batched_input=False):
+ with_batched_input=False,
+ ignore_unknown_columns=False):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -1533,6 +1534,11 @@ class BigQueryWriteFn(DoFn):
with_batched_input: Whether the input has already been batched per
destination. If not, perform best-effort batching per destination
within
a bundle.
+ ignore_unknown_columns: Accept rows that contain values that do not match
+ the schema. The unknown values are ignored. Default is False,
+ which treats unknown values as errors. See reference:
+
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
+
"""
self.schema = schema
self.test_client = test_client
@@ -1568,6 +1574,7 @@ class BigQueryWriteFn(DoFn):
self.bigquery_wrapper = None
self.streaming_api_logging_frequency_sec = (
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
+ self.ignore_unknown_columns = ignore_unknown_columns
def display_data(self):
return {
@@ -1577,7 +1584,8 @@ class BigQueryWriteFn(DoFn):
'create_disposition': str(self.create_disposition),
'write_disposition': str(self.write_disposition),
'additional_bq_parameters': str(self.additional_bq_parameters),
- 'ignore_insert_ids': str(self.ignore_insert_ids)
+ 'ignore_insert_ids': str(self.ignore_insert_ids),
+ 'ignore_unknown_columns': str(self.ignore_unknown_columns)
}
def _reset_rows_buffer(self):
@@ -1725,7 +1733,8 @@ class BigQueryWriteFn(DoFn):
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
- skip_invalid_rows=True)
+ skip_invalid_rows=True,
+ ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)
failed_rows = [rows[entry['index']] for entry in errors]
@@ -1916,7 +1925,8 @@ class WriteToBigQuery(PTransform):
temp_file_format=None,
ignore_insert_ids=False,
# TODO(BEAM-11857): Switch the default when the feature is mature.
- with_auto_sharding=False):
+ with_auto_sharding=False,
+ ignore_unknown_columns=False):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -2043,6 +2053,11 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
determined number of shards to write to BigQuery. This can be used for
both FILE_LOADS and STREAMING_INSERTS. Only applicable to unbounded
input.
+ ignore_unknown_columns: Accept rows that contain values that do not match
+ the schema. The unknown values are ignored. Default is False,
+ which treats unknown values as errors. This option is only valid for
+ method=STREAMING_INSERTS. See reference:
+
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
"""
self._table = table
self._dataset = dataset
@@ -2076,6 +2091,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
self.table_side_inputs = table_side_inputs or ()
self.schema_side_inputs = schema_side_inputs or ()
self._ignore_insert_ids = ignore_insert_ids
+ self._ignore_unknown_columns = ignore_unknown_columns
# Dict/schema methods were moved to bigquery_tools, but keep references
# here for backward compatibility.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index c1bc2d9..56425f7 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -663,7 +663,8 @@ class BigQueryWrapper(object):
table_id,
rows,
insert_ids,
- skip_invalid_rows=False):
+ skip_invalid_rows=False,
+ ignore_unknown_values=False):
"""Calls the insertAll BigQuery API endpoint.
Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
@@ -697,7 +698,8 @@ class BigQueryWrapper(object):
table_ref_str,
json_rows=rows,
row_ids=insert_ids,
- skip_invalid_rows=True,
+ skip_invalid_rows=skip_invalid_rows,
+ ignore_unknown_values=ignore_unknown_values,
timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC)
if not errors:
service_call_metric.call('ok')
@@ -1217,7 +1219,8 @@ class BigQueryWrapper(object):
table_id,
rows,
insert_ids=None,
- skip_invalid_rows=False):
+ skip_invalid_rows=False,
+ ignore_unknown_values=False):
"""Inserts rows into the specified table.
Args:
@@ -1228,6 +1231,10 @@ class BigQueryWrapper(object):
each key in it is the name of a field.
skip_invalid_rows: If there are rows with insertion errors, whether they
should be skipped, and all others should be inserted successfully.
+ ignore_unknown_values: Set this option to true to ignore unknown column
+ names. If the input rows contain columns that are not
+ part of the existing table's schema, those columns are ignored, and
+ the rows are successfully inserted.
Returns:
A tuple (bool, errors). If first element is False then the second element
@@ -1250,7 +1257,9 @@ class BigQueryWrapper(object):
]
result, errors = self._insert_all_rows(
- project_id, dataset_id, table_id, rows, insert_ids)
+ project_id, dataset_id, table_id, rows, insert_ids,
+ skip_invalid_rows=skip_invalid_rows,
+ ignore_unknown_values=ignore_unknown_values)
return result, errors
def _convert_cell_value_to_dict(self, value, field):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index e26cfb5..f9174f5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -959,8 +959,9 @@ class TestBigQueryWriter(unittest.TestCase):
'%s.%s.%s' % ('project', 'dataset', 'table'),
json_rows=[sample_row],
row_ids=['_1'],
- skip_invalid_rows=True,
- timeout=120)
+ skip_invalid_rows=False,
+ timeout=120,
+ ignore_unknown_values=False)
def test_table_schema_without_project(self):
# Writer should pick executing project by default.