This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f7519774e3c BigQueryIO read throttling detection python (#31404)
f7519774e3c is described below

commit f7519774e3c97e192376627b24373ed277d9a54a
Author: Yi Hu <[email protected]>
AuthorDate: Tue May 28 19:52:02 2024 -0400

    BigQueryIO read throttling detection python (#31404)
---
 sdks/python/apache_beam/io/gcp/bigquery.py | 32 ++++++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 43bd1702218..a4d710b1288 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1186,11 +1186,19 @@ class _CustomBigQueryStorageSource(BoundedSource):
           parent=parent,
           read_session=requested_session,
           max_stream_count=stream_count)
+      if self.use_native_datetime:
+        display_schema = "Arrow Schema:" + str(read_session.arrow_schema)
+      else:
+        display_schema = "Avro Schema:" + str(read_session.avro_schema)
       _LOGGER.info(
           'Sent BigQuery Storage API CreateReadSession request: \n %s \n'
-          'Received response \n %s.',
+          'Received %d streams\ndata_format: %s\n'
+          'estimated_total_bytes_scanned: %s\n%s.',
           requested_session,
-          read_session)
+          len(read_session.streams),
+          read_session.data_format,
+          read_session.estimated_total_bytes_scanned,
+          display_schema)
 
       self.split_result = [
           _CustomBigQueryStorageStreamSource(
@@ -1221,6 +1229,10 @@ class _CustomBigQueryStorageSource(BoundedSource):
 
 class _CustomBigQueryStorageStreamSource(BoundedSource):
   """A source representing a single stream in a read session."""
+
+  # Runner will act on this counter on scaling event, if supported
+  THROTTLE_COUNTER = Metrics.counter(__name__, 'cumulativeThrottlingSeconds')
+
   def __init__(
       self, read_stream_name: str, use_native_datetime: Optional[bool] = True):
     self.read_stream_name = read_stream_name
@@ -1276,9 +1288,18 @@ class _CustomBigQueryStorageStreamSource(BoundedSource):
     else:
       return self.read_avro()
 
+  @staticmethod
+  def retry_delay_callback(delay):
+    _LOGGER.info("retry delay: %f", delay)
+    _CustomBigQueryStorageStreamSource.THROTTLE_COUNTER.inc(delay)
+
   def read_arrow(self):
+
     storage_client = bq_storage.BigQueryReadClient()
-    row_iter = iter(storage_client.read_rows(self.read_stream_name).rows())
+    row_iter = iter(
+        storage_client.read_rows(
+            self.read_stream_name,
+            retry_delay_callback=self.retry_delay_callback).rows())
     row = next(row_iter, None)
     # Handling the case where the user might provide very selective filters
     # which can result in read_rows_response being empty.
@@ -1292,7 +1313,10 @@ class _CustomBigQueryStorageStreamSource(BoundedSource):
 
   def read_avro(self):
     storage_client = bq_storage.BigQueryReadClient()
-    read_rows_iterator = iter(storage_client.read_rows(self.read_stream_name))
+    read_rows_iterator = iter(
+        storage_client.read_rows(
+            self.read_stream_name,
+            retry_delay_callback=self.retry_delay_callback))
     # Handling the case where the user might provide very selective filters
     # which can result in read_rows_response being empty.
     first_read_rows_response = next(read_rows_iterator, None)

Reply via email to