This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c36422d Merge pull request #14690 from [BEAM-12445] Moving streaming inserts with new client c36422d is described below commit c36422d95e2ea74d1eaab53f942a027f0aa77174 Author: Pablo <pabl...@users.noreply.github.com> AuthorDate: Fri Jun 18 16:17:01 2021 -0700 Merge pull request #14690 from [BEAM-12445] Moving streaming inserts with new client * Exploring streaming inserts with new client * Fixing unit tests and lint * Remove debug logging * Optimizing json conversion path * Fix lint * Include license for orjson * fix lint --- sdks/python/apache_beam/io/gcp/bigquery.py | 8 ++- sdks/python/apache_beam/io/gcp/bigquery_test.py | 52 ++++++--------- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 76 ++++++++++------------ .../apache_beam/io/gcp/bigquery_tools_test.py | 29 +++------ .../container/license_scripts/dep_urls_py.yaml | 2 + sdks/python/setup.py | 2 + 6 files changed, 72 insertions(+), 97 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 76db54e..01b95c5 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -230,7 +230,8 @@ Much like the schema case, the parameter with `additional_bq_parameters` can also take a callable that receives a table reference. -[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload +[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job\ + #jobconfigurationload [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource @@ -1311,10 +1312,11 @@ class BigQueryWriteFn(DoFn): skip_invalid_rows=True) self.batch_latency_metric.update((time.time() - start) * 1000) - failed_rows = [rows[entry.index] for entry in errors] + failed_rows = [rows[entry['index']] for entry in errors] should_retry = any( RetryStrategy.should_retry( - self._retry_strategy, entry.errors[0].reason) for entry in errors) + self._retry_strategy, entry['errors'][0]['reason']) + for entry in errors) if not passed: self.failed_rows_metric.update(len(failed_rows)) message = ( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 41bbfe2..e4715fa 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -794,8 +794,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): client.tables.Get.return_value = bigquery.Table( tableReference=bigquery.TableReference( projectId='project_id', datasetId='dataset_id', tableId='table_id')) - client.tabledata.InsertAll.return_value = \ - bigquery.TableDataInsertAllResponse(insertErrors=[]) + client.insert_rows_json.return_value = [] create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND @@ -809,15 +808,14 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): fn.process(('project_id:dataset_id.table_id', {'month': 1})) # InsertRows not called as batch size is not hit yet - self.assertFalse(client.tabledata.InsertAll.called) + self.assertFalse(client.insert_rows_json.called) def test_dofn_client_process_flush_called(self): client = mock.Mock() client.tables.Get.return_value = bigquery.Table( tableReference=bigquery.TableReference( projectId='project_id', datasetId='dataset_id', tableId='table_id')) - client.tabledata.InsertAll.return_value = ( - bigquery.TableDataInsertAllResponse(insertErrors=[])) + client.insert_rows_json.return_value = [] create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND @@ -832,15 +830,14 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): fn.process(('project_id:dataset_id.table_id', ({'month': 1}, 'insertid1'))) fn.process(('project_id:dataset_id.table_id', ({'month': 2}, 'insertid2'))) # InsertRows called as batch size is hit - self.assertTrue(client.tabledata.InsertAll.called) + self.assertTrue(client.insert_rows_json.called) def test_dofn_client_finish_bundle_flush_called(self): client = mock.Mock() client.tables.Get.return_value = bigquery.Table( tableReference=bigquery.TableReference( projectId='project_id', datasetId='dataset_id', tableId='table_id')) - client.tabledata.InsertAll.return_value = \ - bigquery.TableDataInsertAllResponse(insertErrors=[]) + client.insert_rows_json.return_value = [] create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND @@ -859,11 +856,11 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): self.assertTrue(client.tables.Get.called) # InsertRows not called as batch size is not hit - self.assertFalse(client.tabledata.InsertAll.called) + self.assertFalse(client.insert_rows_json.called) fn.finish_bundle() # InsertRows called in finish bundle - self.assertTrue(client.tabledata.InsertAll.called) + self.assertTrue(client.insert_rows_json.called) def test_dofn_client_no_records(self): client = mock.Mock() @@ -895,8 +892,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): client.tables.Get.return_value = bigquery.Table( tableReference=bigquery.TableReference( projectId='project_id', datasetId='dataset_id', tableId='table_id')) - client.tabledata.InsertAll.return_value = \ - bigquery.TableDataInsertAllResponse(insertErrors=[]) + client.insert_rows_json.return_value = [] create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND @@ -923,7 +919,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase): }, 'insertid1')])) # InsertRows called since the input is already batched. - self.assertTrue(client.tabledata.InsertAll.called) + self.assertTrue(client.insert_rows_json.called) @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') @@ -933,12 +929,9 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): file_name_1 = os.path.join(tempdir, 'file1') file_name_2 = os.path.join(tempdir, 'file2') - def store_callback(arg): - insert_ids = [r.insertId for r in arg.tableDataInsertAllRequest.rows] - colA_values = [ - r.json.additionalProperties[0].value.string_value - for r in arg.tableDataInsertAllRequest.rows - ] + def store_callback(table, **kwargs): + insert_ids = [r for r in kwargs['row_ids']] + colA_values = [r['columnA'] for r in kwargs['json_rows']] json_output = {'insertIds': insert_ids, 'colA_values': colA_values} # The first time we try to insert, we save those insertions in # file insert_calls1. @@ -950,12 +943,10 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): with open(file_name_2, 'w') as f: json.dump(json_output, f) - res = mock.Mock() - res.insertErrors = [] - return res + return [] client = mock.Mock() - client.tabledata.InsertAll = mock.Mock(side_effect=store_callback) + client.insert_rows_json = mock.Mock(side_effect=store_callback) # Using the bundle based direct runner to avoid pickling problems # with mocks. @@ -996,12 +987,9 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): file_name_1 = os.path.join(tempdir, 'file1') file_name_2 = os.path.join(tempdir, 'file2') - def store_callback(arg): - insert_ids = [r.insertId for r in arg.tableDataInsertAllRequest.rows] - colA_values = [ - r.json.additionalProperties[0].value.string_value - for r in arg.tableDataInsertAllRequest.rows - ] + def store_callback(table, **kwargs): + insert_ids = [r for r in kwargs['row_ids']] + colA_values = [r['columnA'] for r in kwargs['json_rows']] json_output = {'insertIds': insert_ids, 'colA_values': colA_values} # Expect two batches of rows will be inserted. Store them separately. if not os.path.exists(file_name_1): @@ -1011,12 +999,10 @@ class PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp): with open(file_name_2, 'w') as f: json.dump(json_output, f) - res = mock.Mock() - res.insertErrors = [] - return res + return [] client = mock.Mock() - client.tabledata.InsertAll = mock.Mock(side_effect=store_callback) + client.insert_rows_json = mock.Mock(side_effect=store_callback) # Using the bundle based direct runner to avoid pickling problems # with mocks. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 2cf0930..c9540ce 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -46,7 +46,6 @@ import fastavro from apache_beam import coders from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value -from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.internal.http_client import get_new_http from apache_beam.internal.metrics.metric import MetricLogger from apache_beam.internal.metrics.metric import Metrics @@ -69,8 +68,18 @@ from apache_beam.utils.histogram import LinearBucket try: from apitools.base.py.transfer import Upload from apitools.base.py.exceptions import HttpError, HttpForbiddenError + from google.cloud import bigquery as gcp_bigquery except ImportError: + gcp_bigquery = None pass + +try: + from orjson import dumps as fast_json_dumps + from orjson import loads as fast_json_loads +except ImportError: + fast_json_dumps = json.dumps + fast_json_loads = json.loads + # pylint: enable=wrong-import-order, wrong-import-position # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports @@ -292,6 +301,7 @@ class BigQueryWrapper(object): http=get_new_http(), credentials=auth.get_service_credentials(), response_encoding='utf8') + self.gcp_bq_client = client or gcp_bigquery.Client() self._unique_row_id = 0 # For testing scenarios where we pass in a client we do not want a # randomized prefix for row IDs. @@ -596,7 +606,13 @@ class BigQueryWrapper(object): num_retries=MAX_RETRIES, retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter) def _insert_all_rows( - self, project_id, dataset_id, table_id, rows, skip_invalid_rows=False): + self, + project_id, + dataset_id, + table_id, + rows, + insert_ids, + skip_invalid_rows=False): """Calls the insertAll BigQuery API endpoint. Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\ @@ -604,15 +620,6 @@ class BigQueryWrapper(object): # The rows argument is a list of # bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as # required by the InsertAll() method. - request = bigquery.BigqueryTabledataInsertAllRequest( - projectId=project_id, - datasetId=dataset_id, - tableId=table_id, - tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest( - skipInvalidRows=skip_invalid_rows, - # TODO(silviuc): Should have an option for ignoreUnknownValues? - rows=rows)) - resource = resource_identifiers.BigQueryTable( project_id, dataset_id, table_id) @@ -633,14 +640,15 @@ class BigQueryWrapper(object): base_labels=labels) started_millis = int(time.time() * 1000) - response = None try: - response = self.client.tabledata.InsertAll(request) - if not response.insertErrors: + table_ref = gcp_bigquery.DatasetReference(project_id, + dataset_id).table(table_id) + errors = self.gcp_bq_client.insert_rows_json( + table_ref, json_rows=rows, row_ids=insert_ids, skip_invalid_rows=True) + if not errors: service_call_metric.call('ok') - for insert_error in response.insertErrors: - for error in insert_error.errors: - service_call_metric.call(error.reason) + for insert_error in errors: + service_call_metric.call(insert_error['errors'][0]) except HttpError as e: service_call_metric.call(e) @@ -649,9 +657,7 @@ class BigQueryWrapper(object): finally: self._latency_histogram_metric.update( int(time.time() * 1000) - started_millis) - if response: - return not response.insertErrors, response.insertErrors - return False, [] + return not errors, errors @retry.with_exponential_backoff( num_retries=MAX_RETRIES, @@ -1118,29 +1124,19 @@ class BigQueryWrapper(object): # BigQuery will do a best-effort if unique IDs are provided. This situation # can happen during retries on failures. # TODO(silviuc): Must add support to writing TableRow's instead of dicts. - final_rows = [] - for i, row in enumerate(rows): - json_row = self._convert_to_json_row(row) - insert_id = str(self.unique_row_id) if not insert_ids else insert_ids[i] - final_rows.append( - bigquery.TableDataInsertAllRequest.RowsValueListEntry( - insertId=insert_id, json=json_row)) + insert_ids = [ + str(self.unique_row_id) if not insert_ids else insert_ids[i] for i, + _ in enumerate(rows) + ] + rows = [ + fast_json_loads(fast_json_dumps(r, default=default_encoder)) + for r in rows + ] + result, errors = self._insert_all_rows( - project_id, dataset_id, table_id, final_rows, skip_invalid_rows) + project_id, dataset_id, table_id, rows, insert_ids) return result, errors - def _convert_to_json_row(self, row): - json_object = bigquery.JsonObject() - for k, v in row.items(): - if isinstance(v, decimal.Decimal): - # decimal values are converted into string because JSON does not - # support the precision that decimal supports. BQ is able to handle - # inserts into NUMERIC columns by receiving JSON with string attrs. - v = str(v) - json_object.additionalProperties.append( - bigquery.JsonObject.AdditionalProperty(key=k, value=to_json_value(v))) - return json_object - def _convert_cell_value_to_dict(self, value, field): if field.type == 'STRING': # Input: "XYZ" --> Output: "XYZ" diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index e5a4868..566e2b3 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -51,6 +51,7 @@ from apache_beam.options.value_provider import StaticValueProvider # pylint: disable=wrong-import-order, wrong-import-position try: from apitools.base.py.exceptions import HttpError, HttpForbiddenError + from google.cloud import bigquery as gcp_bigquery except ImportError: HttpError = None HttpForbiddenError = None @@ -850,9 +851,7 @@ class TestBigQueryWriter(unittest.TestCase): client.tables.Get.return_value = table write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND - insert_response = mock.Mock() - insert_response.insertErrors = [] - client.tabledata.InsertAll.return_value = insert_response + client.insert_rows_json.return_value = [] with beam.io.BigQuerySink( 'project:dataset.table', @@ -860,24 +859,12 @@ class TestBigQueryWriter(unittest.TestCase): writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14}) sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} - expected_rows = [] - json_object = bigquery.JsonObject() - for k, v in sample_row.items(): - json_object.additionalProperties.append( - bigquery.JsonObject.AdditionalProperty(key=k, value=to_json_value(v))) - expected_rows.append( - bigquery.TableDataInsertAllRequest.RowsValueListEntry( - insertId='_1', # First row ID generated with prefix '' - json=json_object)) - client.tabledata.InsertAll.assert_called_with( - bigquery.BigqueryTabledataInsertAllRequest( - projectId='project', - datasetId='dataset', - tableId='table', - tableDataInsertAllRequest=bigquery.TableDataInsertAllRequest( - rows=expected_rows, - skipInvalidRows=False, - ))) + client.insert_rows_json.assert_called_with( + gcp_bigquery.TableReference( + gcp_bigquery.DatasetReference('project', 'dataset'), 'table'), + json_rows=[sample_row], + row_ids=['_1'], + skip_invalid_rows=True) def test_table_schema_without_project(self): # Writer should pick executing project by default. diff --git a/sdks/python/container/license_scripts/dep_urls_py.yaml b/sdks/python/container/license_scripts/dep_urls_py.yaml index 2ff0a08..f1e89d0 100644 --- a/sdks/python/container/license_scripts/dep_urls_py.yaml +++ b/sdks/python/container/license_scripts/dep_urls_py.yaml @@ -95,6 +95,8 @@ pip_dependencies: license: "https://raw.githubusercontent.com/numpy/numpy/master/LICENSE.txt" oauth2client: license: "https://raw.githubusercontent.com/googleapis/oauth2client/master/LICENSE" + orjson: + license: "https://github.com/ijl/orjson/raw/master/LICENSE-APACHE" pandas: license: "https://raw.githubusercontent.com/pandas-dev/pandas/master/LICENSE" pathlib2: diff --git a/sdks/python/setup.py b/sdks/python/setup.py index ad3d3b2..f90f5de 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -132,6 +132,8 @@ REQUIRED_PACKAGES = [ # is Python standard since Python 3.7 and each Python version is compatible # with a specific dataclasses version. 'dataclasses;python_version<"3.7"', + # orjson, only available on Python 3.6 and above + 'orjson<4.0;python_version>="3.6"', # Dill doesn't have forwards-compatibility guarantees within minor version. # Pickles created with a new version of dill may not unpickle using older # version of dill. It is best to use the same version of dill on client and