This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8250a7b Reducing how much we call BigQuery Get Table API. The BQ API
is queried once per bundle. With this change, it will be queried once per
worker thread. This will help with throughput for BQ streaming inserts
new e34fef8 Merge pull request #12125 from pabloem/BQ_API_quota
8250a7b is described below
commit 8250a7bf88fc71acc748cd6976b72e8bdd44ec75
Author: Pablo Estrada <[email protected]>
AuthorDate: Mon Jun 29 15:30:02 2020 -0700
Reducing how much we call BigQuery Get Table API. The BQ API is queried
once per bundle. With this change, it will be queried once per worker thread.
This will help with throughput for BQ streaming inserts
---
sdks/python/apache_beam/io/gcp/bigquery.py | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index ccaca4d..34dc946 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -963,6 +963,9 @@ bigquery_v2_messages.TableSchema` object.
buffer_size=buffer_size)
+_KNOWN_TABLES = set()
+
+
class BigQueryWriteFn(DoFn):
"""A ``DoFn`` that streams writes to BigQuery once the table is created."""
@@ -1023,7 +1026,6 @@ class BigQueryWriteFn(DoFn):
self.write_disposition = write_disposition
self._rows_buffer = []
self._reset_rows_buffer()
- self._observed_tables = set()
self._total_buffered_rows = 0
self.kms_key = kms_key
@@ -1075,8 +1077,6 @@ class BigQueryWriteFn(DoFn):
self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(
client=self.test_client)
- self._observed_tables = set()
-
self._backoff_calculator = iter(
retry.FuzzedExponentialIntervals(
initial_delay_secs=0.2, num_retries=10000, max_delay_secs=1500))
@@ -1086,7 +1086,7 @@ class BigQueryWriteFn(DoFn):
table_reference.projectId,
table_reference.datasetId,
table_reference.tableId)
- if str_table_reference in self._observed_tables:
+ if str_table_reference in _KNOWN_TABLES:
return
if self.create_disposition == BigQueryDisposition.CREATE_NEVER:
@@ -1110,7 +1110,7 @@ class BigQueryWriteFn(DoFn):
self.create_disposition,
self.write_disposition,
additional_create_parameters=self.additional_bq_parameters)
- self._observed_tables.add(str_table_reference)
+ _KNOWN_TABLES.add(str_table_reference)
def process(self, element, *schema_side_inputs):
destination = element[0]