This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 075b094  [BEAM-7856] Suppress error on table bigquery table already 
exists
     new 5de3583  Merge pull request #9204 from angoenka/bq_create_race
075b094 is described below

commit 075b094be7993466c3cfef3f19b0af5b5119daeb
Author: Ankur Goenka <[email protected]>
AuthorDate: Tue Jul 30 21:36:11 2019 -0700

    [BEAM-7856] Suppress error on table bigquery table already exists
---
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 19 ++++++++++------
 .../apache_beam/io/gcp/bigquery_tools_test.py      | 25 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 5a812c8..8211e28 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -659,12 +659,19 @@ class BigQueryWrapper(object):
     if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
       return found_table
     else:
-      created_table = self._create_table(
-          project_id=project_id,
-          dataset_id=dataset_id,
-          table_id=table_id,
-          schema=schema or found_table.schema,
-          additional_parameters=additional_create_parameters)
+      created_table = None
+      try:
+        created_table = self._create_table(
+            project_id=project_id,
+            dataset_id=dataset_id,
+            table_id=table_id,
+            schema=schema or found_table.schema,
+            additional_parameters=additional_create_parameters)
+      except HttpError as exn:
+        if exn.status_code == 409:
+          logging.debug('Skipping Creation. Table %s:%s.%s already exists.'
+                        % (project_id, dataset_id, table_id))
+          created_table = self.get_table(project_id, dataset_id, table_id)
       logging.info('Created table %s.%s.%s with schema %s. '
                    'Result: %s.',
                    project_id, dataset_id, table_id,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index f6c88b4..cdcba22 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -154,6 +154,31 @@ class TestBigQueryWrapper(unittest.TestCase):
     new_dataset = wrapper.get_or_create_dataset('project_id', 'dataset_id')
     self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id')
 
+  def test_get_or_create_table(self):
+    client = mock.Mock()
+    client.tables.Insert.return_value = 'table_id'
+    client.tables.Get.side_effect = [None, 'table_id']
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+    new_table = wrapper.get_or_create_table(
+        'project_id', 'dataset_id', 'table_id',
+        bigquery.TableSchema(fields=[
+            bigquery.TableFieldSchema(name='b', type='BOOLEAN',
+                                      mode='REQUIRED')]), False, False)
+    self.assertEqual(new_table, 'table_id')
+
+  def test_get_or_create_table_race_condition(self):
+    client = mock.Mock()
+    client.tables.Insert.side_effect = HttpError(
+        response={'status': '409'}, url='', content='')
+    client.tables.Get.side_effect = [None, 'table_id']
+    wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
+    new_table = wrapper.get_or_create_table(
+        'project_id', 'dataset_id', 'table_id',
+        bigquery.TableSchema(fields=[
+            bigquery.TableFieldSchema(name='b', type='BOOLEAN',
+                                      mode='REQUIRED')]), False, False)
+    self.assertEqual(new_table, 'table_id')
+
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestBigQueryReader(unittest.TestCase):

Reply via email to