This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c36422d  Merge pull request #14690 from [BEAM-12445] Moving streaming 
inserts with new client
c36422d is described below

commit c36422d95e2ea74d1eaab53f942a027f0aa77174
Author: Pablo <pabl...@users.noreply.github.com>
AuthorDate: Fri Jun 18 16:17:01 2021 -0700

    Merge pull request #14690 from [BEAM-12445] Moving streaming inserts with 
new client
    
    * Exploring streaming inserts with new client
    
    * Fixing unit tests and lint
    
    * Remove debug logging
    
    * Optimizing json conversion path
    
    * Fix lint
    
    * Include license for orjson
    
    * fix lint
---
 sdks/python/apache_beam/io/gcp/bigquery.py         |  8 ++-
 sdks/python/apache_beam/io/gcp/bigquery_test.py    | 52 ++++++---------
 sdks/python/apache_beam/io/gcp/bigquery_tools.py   | 76 ++++++++++------------
 .../apache_beam/io/gcp/bigquery_tools_test.py      | 29 +++------
 .../container/license_scripts/dep_urls_py.yaml     |  2 +
 sdks/python/setup.py                               |  2 +
 6 files changed, 72 insertions(+), 97 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 76db54e..01b95c5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -230,7 +230,8 @@ Much like the schema case, the parameter with 
`additional_bq_parameters` can
 also take a callable that receives a table reference.
 
 
-[1] 
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
+[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job\
+        #jobconfigurationload
 [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
 [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
 
@@ -1311,10 +1312,11 @@ class BigQueryWriteFn(DoFn):
           skip_invalid_rows=True)
       self.batch_latency_metric.update((time.time() - start) * 1000)
 
-      failed_rows = [rows[entry.index] for entry in errors]
+      failed_rows = [rows[entry['index']] for entry in errors]
       should_retry = any(
           RetryStrategy.should_retry(
-              self._retry_strategy, entry.errors[0].reason) for entry in 
errors)
+              self._retry_strategy, entry['errors'][0]['reason'])
+          for entry in errors)
       if not passed:
         self.failed_rows_metric.update(len(failed_rows))
         message = (
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 41bbfe2..e4715fa 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -794,8 +794,7 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
     client.tables.Get.return_value = bigquery.Table(
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
-    client.tabledata.InsertAll.return_value = \
-      bigquery.TableDataInsertAllResponse(insertErrors=[])
+    client.insert_rows_json.return_value = []
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
 
@@ -809,15 +808,14 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
     fn.process(('project_id:dataset_id.table_id', {'month': 1}))
 
     # InsertRows not called as batch size is not hit yet
-    self.assertFalse(client.tabledata.InsertAll.called)
+    self.assertFalse(client.insert_rows_json.called)
 
   def test_dofn_client_process_flush_called(self):
     client = mock.Mock()
     client.tables.Get.return_value = bigquery.Table(
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
-    client.tabledata.InsertAll.return_value = (
-        bigquery.TableDataInsertAllResponse(insertErrors=[]))
+    client.insert_rows_json.return_value = []
     create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
 
@@ -832,15 +830,14 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
     fn.process(('project_id:dataset_id.table_id', ({'month': 1}, 'insertid1')))
     fn.process(('project_id:dataset_id.table_id', ({'month': 2}, 'insertid2')))
     # InsertRows called as batch size is hit
-    self.assertTrue(client.tabledata.InsertAll.called)
+    self.assertTrue(client.insert_rows_json.called)
 
   def test_dofn_client_finish_bundle_flush_called(self):
     client = mock.Mock()
     client.tables.Get.return_value = bigquery.Table(
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
-    client.tabledata.InsertAll.return_value = \
-      bigquery.TableDataInsertAllResponse(insertErrors=[])
+    client.insert_rows_json.return_value = []
     create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
 
@@ -859,11 +856,11 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
 
     self.assertTrue(client.tables.Get.called)
     # InsertRows not called as batch size is not hit
-    self.assertFalse(client.tabledata.InsertAll.called)
+    self.assertFalse(client.insert_rows_json.called)
 
     fn.finish_bundle()
     # InsertRows called in finish bundle
-    self.assertTrue(client.tabledata.InsertAll.called)
+    self.assertTrue(client.insert_rows_json.called)
 
   def test_dofn_client_no_records(self):
     client = mock.Mock()
@@ -895,8 +892,7 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
     client.tables.Get.return_value = bigquery.Table(
         tableReference=bigquery.TableReference(
             projectId='project_id', datasetId='dataset_id', 
tableId='table_id'))
-    client.tabledata.InsertAll.return_value = \
-      bigquery.TableDataInsertAllResponse(insertErrors=[])
+    client.insert_rows_json.return_value = []
     create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
 
@@ -923,7 +919,7 @@ class 
BigQueryStreamingInsertTransformTests(unittest.TestCase):
         }, 'insertid1')]))
 
     # InsertRows called since the input is already batched.
-    self.assertTrue(client.tabledata.InsertAll.called)
+    self.assertTrue(client.insert_rows_json.called)
 
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
@@ -933,12 +929,9 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
     file_name_1 = os.path.join(tempdir, 'file1')
     file_name_2 = os.path.join(tempdir, 'file2')
 
-    def store_callback(arg):
-      insert_ids = [r.insertId for r in arg.tableDataInsertAllRequest.rows]
-      colA_values = [
-          r.json.additionalProperties[0].value.string_value
-          for r in arg.tableDataInsertAllRequest.rows
-      ]
+    def store_callback(table, **kwargs):
+      insert_ids = [r for r in kwargs['row_ids']]
+      colA_values = [r['columnA'] for r in kwargs['json_rows']]
       json_output = {'insertIds': insert_ids, 'colA_values': colA_values}
       # The first time we try to insert, we save those insertions in
       # file insert_calls1.
@@ -950,12 +943,10 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
         with open(file_name_2, 'w') as f:
           json.dump(json_output, f)
 
-      res = mock.Mock()
-      res.insertErrors = []
-      return res
+      return []
 
     client = mock.Mock()
-    client.tabledata.InsertAll = mock.Mock(side_effect=store_callback)
+    client.insert_rows_json = mock.Mock(side_effect=store_callback)
 
     # Using the bundle based direct runner to avoid pickling problems
     # with mocks.
@@ -996,12 +987,9 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
     file_name_1 = os.path.join(tempdir, 'file1')
     file_name_2 = os.path.join(tempdir, 'file2')
 
-    def store_callback(arg):
-      insert_ids = [r.insertId for r in arg.tableDataInsertAllRequest.rows]
-      colA_values = [
-          r.json.additionalProperties[0].value.string_value
-          for r in arg.tableDataInsertAllRequest.rows
-      ]
+    def store_callback(table, **kwargs):
+      insert_ids = [r for r in kwargs['row_ids']]
+      colA_values = [r['columnA'] for r in kwargs['json_rows']]
       json_output = {'insertIds': insert_ids, 'colA_values': colA_values}
       # Expect two batches of rows will be inserted. Store them separately.
       if not os.path.exists(file_name_1):
@@ -1011,12 +999,10 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
         with open(file_name_2, 'w') as f:
           json.dump(json_output, f)
 
-      res = mock.Mock()
-      res.insertErrors = []
-      return res
+      return []
 
     client = mock.Mock()
-    client.tabledata.InsertAll = mock.Mock(side_effect=store_callback)
+    client.insert_rows_json = mock.Mock(side_effect=store_callback)
 
     # Using the bundle based direct runner to avoid pickling problems
     # with mocks.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 2cf0930..c9540ce 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -46,7 +46,6 @@ import fastavro
 from apache_beam import coders
 from apache_beam.internal.gcp import auth
 from apache_beam.internal.gcp.json_value import from_json_value
-from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.internal.http_client import get_new_http
 from apache_beam.internal.metrics.metric import MetricLogger
 from apache_beam.internal.metrics.metric import Metrics
@@ -69,8 +68,18 @@ from apache_beam.utils.histogram import LinearBucket
 try:
   from apitools.base.py.transfer import Upload
   from apitools.base.py.exceptions import HttpError, HttpForbiddenError
+  from google.cloud import bigquery as gcp_bigquery
 except ImportError:
+  gcp_bigquery = None
   pass
+
+try:
+  from orjson import dumps as fast_json_dumps
+  from orjson import loads as fast_json_loads
+except ImportError:
+  fast_json_dumps = json.dumps
+  fast_json_loads = json.loads
+
 # pylint: enable=wrong-import-order, wrong-import-position
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -292,6 +301,7 @@ class BigQueryWrapper(object):
         http=get_new_http(),
         credentials=auth.get_service_credentials(),
         response_encoding='utf8')
+    self.gcp_bq_client = client or gcp_bigquery.Client()
     self._unique_row_id = 0
     # For testing scenarios where we pass in a client we do not want a
     # randomized prefix for row IDs.
@@ -596,7 +606,13 @@ class BigQueryWrapper(object):
       num_retries=MAX_RETRIES,
       retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter)
   def _insert_all_rows(
-      self, project_id, dataset_id, table_id, rows, skip_invalid_rows=False):
+      self,
+      project_id,
+      dataset_id,
+      table_id,
+      rows,
+      insert_ids,
+      skip_invalid_rows=False):
     """Calls the insertAll BigQuery API endpoint.
 
     Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
@@ -604,15 +620,6 @@ class BigQueryWrapper(object):
     # The rows argument is a list of
     # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
     # required by the InsertAll() method.
-    request = bigquery.BigqueryTabledataInsertAllRequest(
-        projectId=project_id,
-        datasetId=dataset_id,
-        tableId=table_id,
-        tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
-            skipInvalidRows=skip_invalid_rows,
-            # TODO(silviuc): Should have an option for ignoreUnknownValues?
-            rows=rows))
-
     resource = resource_identifiers.BigQueryTable(
         project_id, dataset_id, table_id)
 
@@ -633,14 +640,15 @@ class BigQueryWrapper(object):
         base_labels=labels)
 
     started_millis = int(time.time() * 1000)
-    response = None
     try:
-      response = self.client.tabledata.InsertAll(request)
-      if not response.insertErrors:
+      table_ref = gcp_bigquery.DatasetReference(project_id,
+                                                dataset_id).table(table_id)
+      errors = self.gcp_bq_client.insert_rows_json(
+          table_ref, json_rows=rows, row_ids=insert_ids, 
skip_invalid_rows=True)
+      if not errors:
         service_call_metric.call('ok')
-      for insert_error in response.insertErrors:
-        for error in insert_error.errors:
-          service_call_metric.call(error.reason)
+      for insert_error in errors:
+        service_call_metric.call(insert_error['errors'][0])
     except HttpError as e:
       service_call_metric.call(e)
 
@@ -649,9 +657,7 @@ class BigQueryWrapper(object):
     finally:
       self._latency_histogram_metric.update(
           int(time.time() * 1000) - started_millis)
-    if response:
-      return not response.insertErrors, response.insertErrors
-    return False, []
+    return not errors, errors
 
   @retry.with_exponential_backoff(
       num_retries=MAX_RETRIES,
@@ -1118,29 +1124,19 @@ class BigQueryWrapper(object):
     # BigQuery will do a best-effort if unique IDs are provided. This situation
     # can happen during retries on failures.
     # TODO(silviuc): Must add support to writing TableRow's instead of dicts.
-    final_rows = []
-    for i, row in enumerate(rows):
-      json_row = self._convert_to_json_row(row)
-      insert_id = str(self.unique_row_id) if not insert_ids else insert_ids[i]
-      final_rows.append(
-          bigquery.TableDataInsertAllRequest.RowsValueListEntry(
-              insertId=insert_id, json=json_row))
+    insert_ids = [
+        str(self.unique_row_id) if not insert_ids else insert_ids[i] for i,
+        _ in enumerate(rows)
+    ]
+    rows = [
+        fast_json_loads(fast_json_dumps(r, default=default_encoder))
+        for r in rows
+    ]
+
     result, errors = self._insert_all_rows(
-        project_id, dataset_id, table_id, final_rows, skip_invalid_rows)
+        project_id, dataset_id, table_id, rows, insert_ids)
     return result, errors
 
-  def _convert_to_json_row(self, row):
-    json_object = bigquery.JsonObject()
-    for k, v in row.items():
-      if isinstance(v, decimal.Decimal):
-        # decimal values are converted into string because JSON does not
-        # support the precision that decimal supports. BQ is able to handle
-        # inserts into NUMERIC columns by receiving JSON with string attrs.
-        v = str(v)
-      json_object.additionalProperties.append(
-          bigquery.JsonObject.AdditionalProperty(key=k, 
value=to_json_value(v)))
-    return json_object
-
   def _convert_cell_value_to_dict(self, value, field):
     if field.type == 'STRING':
       # Input: "XYZ" --> Output: "XYZ"
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
index e5a4868..566e2b3 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
@@ -51,6 +51,7 @@ from apache_beam.options.value_provider import 
StaticValueProvider
 # pylint: disable=wrong-import-order, wrong-import-position
 try:
   from apitools.base.py.exceptions import HttpError, HttpForbiddenError
+  from google.cloud import bigquery as gcp_bigquery
 except ImportError:
   HttpError = None
   HttpForbiddenError = None
@@ -850,9 +851,7 @@ class TestBigQueryWriter(unittest.TestCase):
     client.tables.Get.return_value = table
     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
 
-    insert_response = mock.Mock()
-    insert_response.insertErrors = []
-    client.tabledata.InsertAll.return_value = insert_response
+    client.insert_rows_json.return_value = []
 
     with beam.io.BigQuerySink(
         'project:dataset.table',
@@ -860,24 +859,12 @@ class TestBigQueryWriter(unittest.TestCase):
       writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14})
 
     sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14}
-    expected_rows = []
-    json_object = bigquery.JsonObject()
-    for k, v in sample_row.items():
-      json_object.additionalProperties.append(
-          bigquery.JsonObject.AdditionalProperty(key=k, 
value=to_json_value(v)))
-    expected_rows.append(
-        bigquery.TableDataInsertAllRequest.RowsValueListEntry(
-            insertId='_1',  # First row ID generated with prefix ''
-            json=json_object))
-    client.tabledata.InsertAll.assert_called_with(
-        bigquery.BigqueryTabledataInsertAllRequest(
-            projectId='project',
-            datasetId='dataset',
-            tableId='table',
-            tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest(
-                rows=expected_rows,
-                skipInvalidRows=False,
-            )))
+    client.insert_rows_json.assert_called_with(
+        gcp_bigquery.TableReference(
+            gcp_bigquery.DatasetReference('project', 'dataset'), 'table'),
+        json_rows=[sample_row],
+        row_ids=['_1'],
+        skip_invalid_rows=True)
 
   def test_table_schema_without_project(self):
     # Writer should pick executing project by default.
diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml 
b/sdks/python/container/license_scripts/dep_urls_py.yaml
index 2ff0a08..f1e89d0 100644
--- a/sdks/python/container/license_scripts/dep_urls_py.yaml
+++ b/sdks/python/container/license_scripts/dep_urls_py.yaml
@@ -95,6 +95,8 @@ pip_dependencies:
     license: "https://raw.githubusercontent.com/numpy/numpy/master/LICENSE.txt";
   oauth2client:
     license: 
"https://raw.githubusercontent.com/googleapis/oauth2client/master/LICENSE";
+  orjson:
+    license: "https://github.com/ijl/orjson/raw/master/LICENSE-APACHE";
   pandas:
     license: 
"https://raw.githubusercontent.com/pandas-dev/pandas/master/LICENSE";
   pathlib2:
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ad3d3b2..f90f5de 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -132,6 +132,8 @@ REQUIRED_PACKAGES = [
     # is Python standard since Python 3.7 and each Python version is compatible
     # with a specific dataclasses version.
     'dataclasses;python_version<"3.7"',
+    # orjson, only available on Python 3.6 and above
+    'orjson<4.0;python_version>="3.6"',
     # Dill doesn't have forwards-compatibility guarantees within minor version.
     # Pickles created with a new version of dill may not unpickle using older
     # version of dill. It is best to use the same version of dill on client and

Reply via email to