This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 889d4db025a Give an option to override the number of shards in BQ 
streaming insert addresses (#25569)
889d4db025a is described below

commit 889d4db025a1fe7c0b0f12397a778e547989eb41
Author: dannikay <[email protected]>
AuthorDate: Tue Mar 7 09:08:56 2023 -0800

    Give an option to override the number of shards in BQ streaming insert 
addresses (#25569)
    
    * Give an option to override the default number of shards in BQ streaming 
inserts
    
    * Fix unit test
    
    * Fix lint error
    
    * Address comments
---
 sdks/python/apache_beam/io/gcp/bigquery.py      | 13 ++++++++++---
 sdks/python/apache_beam/io/gcp/bigquery_test.py | 12 ++++++++----
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 7233326ce0c..5fa10d7a688 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1641,6 +1641,7 @@ class _StreamToBigQuery(PTransform):
       ignore_insert_ids,
       ignore_unknown_columns,
       with_auto_sharding,
+      num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
       test_client=None,
       max_retries=None):
     self.table_reference = table_reference
@@ -1658,6 +1659,7 @@ class _StreamToBigQuery(PTransform):
     self.ignore_insert_ids = ignore_insert_ids
     self.ignore_unknown_columns = ignore_unknown_columns
     self.with_auto_sharding = with_auto_sharding
+    self._num_streaming_keys = num_streaming_keys
     self.max_retries = max_retries or MAX_INSERT_RETRIES
 
   class InsertIdPrefixFn(DoFn):
@@ -1690,7 +1692,7 @@ class _StreamToBigQuery(PTransform):
     def _add_random_shard(element):
       key = element[0]
       value = element[1]
-      return ((key, random.randint(0, DEFAULT_SHARDS_PER_DESTINATION)), value)
+      return ((key, random.randint(0, self._num_streaming_keys)), value)
 
     def _restore_table_ref(sharded_table_ref_elems_kv):
       sharded_table_ref = sharded_table_ref_elems_kv[0]
@@ -1782,7 +1784,8 @@ class WriteToBigQuery(PTransform):
       # when the feature is mature.
       with_auto_sharding=False,
       ignore_unknown_columns=False,
-      load_job_project_id=None):
+      load_job_project_id=None,
+      num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION):
     """Initialize a WriteToBigQuery transform.
 
     Args:
@@ -1927,6 +1930,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
       load_job_project_id: Specifies an alternate GCP project id to use for
         billingBatch File Loads. By default, the project id of the table is
         used.
+      num_streaming_keys: The number of shards per destination when writing via
+        streaming inserts.
     """
     self._table = table
     self._dataset = dataset
@@ -1962,6 +1967,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
     self._ignore_insert_ids = ignore_insert_ids
     self._ignore_unknown_columns = ignore_unknown_columns
     self.load_job_project_id = load_job_project_id
+    self._num_streaming_keys = num_streaming_keys
 
   # Dict/schema methods were moved to bigquery_tools, but keep references
   # here for backward compatibility.
@@ -2024,7 +2030,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
           ignore_insert_ids=self._ignore_insert_ids,
           ignore_unknown_columns=self._ignore_unknown_columns,
           with_auto_sharding=self.with_auto_sharding,
-          test_client=self.test_client)
+          test_client=self.test_client,
+          num_streaming_keys=self._num_streaming_keys)
 
       return WriteResult(
           method=WriteToBigQuery.Method.STREAMING_INSERTS,
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 08775d26075..56a32471aaf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -1351,7 +1351,8 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
               ignore_insert_ids=False,
               ignore_unknown_columns=False,
               with_auto_sharding=False,
-              test_client=client))
+              test_client=client,
+              num_streaming_keys=500))
 
     with open(file_name_1) as f1, open(file_name_2) as f2:
       self.assertEqual(json.load(f1), json.load(f2))
@@ -1447,7 +1448,8 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
                 ignore_insert_ids=False,
                 ignore_unknown_columns=False,
                 with_auto_sharding=False,
-                test_client=client))
+                test_client=client,
+                num_streaming_keys=500))
 
         failed_values = (
             bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
@@ -1523,7 +1525,8 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
                 ignore_unknown_columns=False,
                 with_auto_sharding=False,
                 test_client=client,
-                max_retries=10))
+                max_retries=10,
+                num_streaming_keys=500))
 
         failed_values = (
             bq_write_out[beam_bq.BigQueryWriteFn.FAILED_ROWS]
@@ -1589,7 +1592,8 @@ class 
PipelineBasedStreamingInsertTest(_TestCaseWithTempDirCleanUp):
               ignore_insert_ids=False,
               ignore_unknown_columns=False,
               with_auto_sharding=with_auto_sharding,
-              test_client=client))
+              test_client=client,
+              num_streaming_keys=500))
 
     with open(file_name_1) as f1, open(file_name_2) as f2:
       out1 = json.load(f1)

Reply via email to