This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f7519774e3c BigQueryIO read throttling detection python (#31404)
f7519774e3c is described below
commit f7519774e3c97e192376627b24373ed277d9a54a
Author: Yi Hu <[email protected]>
AuthorDate: Tue May 28 19:52:02 2024 -0400
BigQueryIO read throttling detection python (#31404)
---
sdks/python/apache_beam/io/gcp/bigquery.py | 32 ++++++++++++++++++++++++++----
1 file changed, 28 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 43bd1702218..a4d710b1288 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1186,11 +1186,19 @@ class _CustomBigQueryStorageSource(BoundedSource):
parent=parent,
read_session=requested_session,
max_stream_count=stream_count)
+ if self.use_native_datetime:
+ display_schema = "Arrow Schema:" + str(read_session.arrow_schema)
+ else:
+ display_schema = "Avro Schema:" + str(read_session.avro_schema)
_LOGGER.info(
'Sent BigQuery Storage API CreateReadSession request: \n %s \n'
- 'Received response \n %s.',
+ 'Received %d streams\ndata_format: %s\n'
+ 'estimated_total_bytes_scanned: %s\n%s.',
requested_session,
- read_session)
+ len(read_session.streams),
+ read_session.data_format,
+ read_session.estimated_total_bytes_scanned,
+ display_schema)
self.split_result = [
_CustomBigQueryStorageStreamSource(
@@ -1221,6 +1229,10 @@ class _CustomBigQueryStorageSource(BoundedSource):
class _CustomBigQueryStorageStreamSource(BoundedSource):
"""A source representing a single stream in a read session."""
+
+ # Runner will act on this counter on scaling event, if supported
+ THROTTLE_COUNTER = Metrics.counter(__name__, 'cumulativeThrottlingSeconds')
+
def __init__(
self, read_stream_name: str, use_native_datetime: Optional[bool] = True):
self.read_stream_name = read_stream_name
@@ -1276,9 +1288,18 @@ class _CustomBigQueryStorageStreamSource(BoundedSource):
else:
return self.read_avro()
+ @staticmethod
+ def retry_delay_callback(delay):
+ _LOGGER.info("retry delay: %f", delay)
+ _CustomBigQueryStorageStreamSource.THROTTLE_COUNTER.inc(delay)
+
def read_arrow(self):
+
storage_client = bq_storage.BigQueryReadClient()
- row_iter = iter(storage_client.read_rows(self.read_stream_name).rows())
+ row_iter = iter(
+ storage_client.read_rows(
+ self.read_stream_name,
+ retry_delay_callback=self.retry_delay_callback).rows())
row = next(row_iter, None)
# Handling the case where the user might provide very selective filters
# which can result in read_rows_response being empty.
@@ -1292,7 +1313,10 @@ class _CustomBigQueryStorageStreamSource(BoundedSource):
def read_avro(self):
storage_client = bq_storage.BigQueryReadClient()
- read_rows_iterator = iter(storage_client.read_rows(self.read_stream_name))
+ read_rows_iterator = iter(
+ storage_client.read_rows(
+ self.read_stream_name,
+ retry_delay_callback=self.retry_delay_callback))
# Handling the case where the user might provide very selective filters
# which can result in read_rows_response being empty.
first_read_rows_response = next(read_rows_iterator, None)