This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 889d4db025a Give an option to override the number of shards in BQ
streaming insert addresses (#25569)
889d4db025a is described below
commit 889d4db025a1fe7c0b0f12397a778e547989eb41
Author: dannikay <[email protected]>
AuthorDate: Tue Mar 7 09:08:56 2023 -0800
Give an option to override the number of shards in BQ streaming insert
addresses (#25569)
* Give an option to override the default number of shards in BQ streaming
inserts
* Fix unit test
* Fix lint error
* Address comments
---
sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++++++++++---
sdks/python/apache_beam/io/gcp/bigquery_test.py | 12 ++++++++----
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 7233326ce0c..5fa10d7a688 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1641,6 +1641,7 @@ class _StreamToBigQuery(PTransform):
ignore_insert_ids,
ignore_unknown_columns,
with_auto_sharding,
+ num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
test_client=None,
max_retries=None):
self.table_reference = table_reference
@@ -1658,6 +1659,7 @@ class _StreamToBigQuery(PTransform):
self.ignore_insert_ids = ignore_insert_ids
self.ignore_unknown_columns = ignore_unknown_columns
self.with_auto_sharding = with_auto_sharding
+ self._num_streaming_keys = num_streaming_keys
self.max_retries = max_retries or MAX_INSERT_RETRIES
class InsertIdPrefixFn(DoFn):
@@ -1690,7 +1692,7 @@ class _StreamToBigQuery(PTransform):
def _add_random_shard(element):
key = element[0]
value = element[1]
- return ((key, random.randint(0, DEFAULT_SHARDS_PER_DESTINATION)), value)
+ return ((key, random.randint(0, self._num_streaming_keys)), value)
def _restore_table_ref(sharded_table_ref_elems_kv):
sharded_table_ref = sharded_table_ref_elems_kv[0]
@@ -1782,7 +1784,8 @@ class WriteToBigQuery(PTransform):
# when the feature is mature.
with_auto_sharding=False,
ignore_unknown_columns=False,
- load_job_project_id=None):
+ load_job_project_id=None,
+ num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -1927,6 +1930,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
load_job_project_id: Specifies an alternate GCP project id to use for
billingBatch File Loads. By default, the project id of the table is
used.
+ num_streaming_keys: The number of shards per destination when writing via
+ streaming inserts.
"""
self._table = table
self._dataset = dataset
@@ -1962,6 +1967,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
self._ignore_insert_ids = ignore_insert_ids
self._ignore_unknown_columns = ignore_unknown_columns
self.load_job_project_id = load_job_project_id
+ self._num_streaming_keys = num_streaming_keys
# Dict/schema methods were moved to bigquery_tools, but keep references
# here for backward compatibility.
@@ -2024,7 +2030,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
ignore_insert_ids=self._ignore_insert_ids,
ignore_unknown_columns=self._ignore_unknown_columns,
with_auto_sharding=self.with_auto_sharding,
- test_client=self.test_client)
+ test_client=self.test_client,
+ num_streaming_keys=self._num_streaming_keys)
return WriteResult(
method=WriteToBigQuery.Method.STREAMING_INSERTS,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 08775d26075..56a32471aaf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -1351,7 +1351,8 @@ class
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
ignore_insert_ids=False,
ignore_unknown_columns=False,
with_auto_sharding=False,
- test_client=client))
+ test_client=client,
+ num_streaming_keys=500))
with open(file_name_1) as f1, open(file_name_2) as f2:
self.assertEqual(json.load(f1), json.load(f2))
@@ -1447,7 +1448,8 @@ class
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
ignore_insert_ids=False,
ignore_unknown_columns=False,
with_auto_sharding=False,
- test_client=client))
+ test_client=client,
+ num_streaming_keys=500))
failed_values = (
bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
@@ -1523,7 +1525,8 @@ class
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
ignore_unknown_columns=False,
with_auto_sharding=False,
test_client=client,
- max_retries=10))
+ max_retries=10,
+ num_streaming_keys=500))
failed_values = (
bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
@@ -1589,7 +1592,8 @@ class
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
ignore_insert_ids=False,
ignore_unknown_columns=False,
with_auto_sharding=with_auto_sharding,
- test_client=client))
+ test_client=client,
+ num_streaming_keys=500))
with open(file_name_1) as f1, open(file_name_2) as f2:
out1 = json.load(f1)