This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 075b094 [BEAM-7856] Suppress error on table bigquery table already
exists
new 5de3583 Merge pull request #9204 from angoenka/bq_create_race
075b094 is described below
commit 075b094be7993466c3cfef3f19b0af5b5119daeb
Author: Ankur Goenka <[email protected]>
AuthorDate: Tue Jul 30 21:36:11 2019 -0700
[BEAM-7856] Suppress error on table bigquery table already exists
---
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 19 ++++++++++------
.../apache_beam/io/gcp/bigquery_tools_test.py | 25 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 5a812c8..8211e28 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -659,12 +659,19 @@ class BigQueryWrapper(object):
if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
return found_table
else:
- created_table = self._create_table(
- project_id=project_id,
- dataset_id=dataset_id,
- table_id=table_id,
- schema=schema or found_table.schema,
- additional_parameters=additional_create_parameters)
+ created_table = None
+ try:
+ created_table = self._create_table(
+ project_id=project_id,
+ dataset_id=dataset_id,
+ table_id=table_id,
+ schema=schema or found_table.schema,
+ additional_parameters=additional_create_parameters)
+ except HttpError as exn:
+ if exn.status_code == 409:
+ logging.debug('Skipping Creation. Table %s:%s.%s already exists.'
+ % (project_id, dataset_id, table_id))
+ created_table = self.get_table(project_id, dataset_id, table_id)
logging.info('Created table %s.%s.%s with schema %s. '
'Result: %s.',
project_id, dataset_id, table_id,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index f6c88b4..cdcba22 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -154,6 +154,31 @@ class TestBigQueryWrapper(unittest.TestCase):
new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id')
self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id')
+ def test_get_or_create_table(self):
+ client = mock.Mock()
+ client.tables.Insert.return_value = 'table_id'
+ client.tables.Get.side_effect = [None, 'table_id']
+ wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+ new_table = wrapper.get_or_create_table(
+ 'project_id', 'dataset_id', 'table_id',
+ bigquery.TableSchema(fields=[
+ bigquery.TableFieldSchema(name='b', type='BOOLEAN',
+ mode='REQUIRED')]), False, False)
+ self.assertEqual(new_table, 'table_id')
+
+ def test_get_or_create_table_race_condition(self):
+ client = mock.Mock()
+ client.tables.Insert.side_effect = HttpError(
+ response={'status': '409'}, url='', content='')
+ client.tables.Get.side_effect = [None, 'table_id']
+ wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+ new_table = wrapper.get_or_create_table(
+ 'project_id', 'dataset_id', 'table_id',
+ bigquery.TableSchema(fields=[
+ bigquery.TableFieldSchema(name='b', type='BOOLEAN',
+ mode='REQUIRED')]), False, False)
+ self.assertEqual(new_table, 'table_id')
+
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQueryReader(unittest.TestCase):