This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1842dd2f07a Respect BigQuery insert byte size limits (#24979)
1842dd2f07a is described below
commit 1842dd2f07aee9485c37333882c2f9d1e80f880c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Jul 5 17:03:59 2023 +0000
Respect BigQuery insert byte size limits (#24979)
* limit insert request by bytes; unit test
* make limit checks before adding row; log a warning when seeing a large row
* send large rows to DLQ
* fix tests
* use number instead of variable
* use global variable
* use number
* use global variable
* use number
* expose max payload size as a parameter
* syntax fix
* remove warning log; revert irrelevant tests
* maintain buffer byte size for each destination
* only delete buffer if it exists
* attribute name change to indicate private
* check user specified maximum is within allowed limits
---
sdks/python/apache_beam/io/gcp/bigquery.py | 74 ++++++++++++++++++++++---
sdks/python/apache_beam/io/gcp/bigquery_test.py | 32 +++++++++++
2 files changed, 98 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index d12dd276a1a..bcc04acb9a9 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -371,6 +371,7 @@ from typing import Tuple
from typing import Union
import fastavro
+from objsize import get_deep_size
import apache_beam as beam
from apache_beam import coders
@@ -474,6 +475,13 @@ tried for a very long time. You may reduce this property
to reduce the number
of retries.
"""
MAX_INSERT_RETRIES = 10000
+"""
+The maximum byte size for a BigQuery legacy streaming insert payload.
+
+Note: The actual limit is 10MB, but we set it to 9MB to make room for request
+overhead: https://cloud.google.com/bigquery/quotas#streaming_inserts
+"""
+MAX_INSERT_PAYLOAD_SIZE = 9 << 20
@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
@@ -1325,7 +1333,8 @@ class BigQueryWriteFn(DoFn):
ignore_insert_ids=False,
with_batched_input=False,
ignore_unknown_columns=False,
- max_retries=MAX_INSERT_RETRIES):
+ max_retries=MAX_INSERT_RETRIES,
+ max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -1376,7 +1385,8 @@ class BigQueryWriteFn(DoFn):
max_retries: The number of times that we will retry inserting a group of
rows into BigQuery. By default, we retry 10000 times with exponential
backoffs (effectively retry forever).
-
+ max_insert_payload_size: The maximum byte size for a BigQuery legacy
+ streaming insert payload.
"""
self.schema = schema
self.test_client = test_client
@@ -1414,6 +1424,7 @@ class BigQueryWriteFn(DoFn):
BigQueryWriteFn.STREAMING_API_LOGGING_FREQUENCY_SEC)
self.ignore_unknown_columns = ignore_unknown_columns
self._max_retries = max_retries
+ self._max_insert_payload_size = max_insert_payload_size
def display_data(self):
return {
@@ -1429,6 +1440,7 @@ class BigQueryWriteFn(DoFn):
def _reset_rows_buffer(self):
self._rows_buffer = collections.defaultdict(lambda: [])
+ self._destination_buffer_byte_size = collections.defaultdict(lambda: 0)
@staticmethod
def get_table_schema(schema):
@@ -1515,11 +1527,42 @@ class BigQueryWriteFn(DoFn):
if not self.with_batched_input:
row_and_insert_id = element[1]
+ row_byte_size = get_deep_size(row_and_insert_id)
+
+ # send large rows that exceed BigQuery insert limits to DLQ
+ if row_byte_size >= self._max_insert_payload_size:
+ row_mb_size = row_byte_size / 1_000_000
+ max_mb_size = self._max_insert_payload_size / 1_000_000
+ error = (
+ f"Received row with size {row_mb_size}MB that exceeds "
+ f"the maximum insert payload size set ({max_mb_size}MB).")
+ return [
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0], error))),
+ pvalue.TaggedOutput(
+ BigQueryWriteFn.FAILED_ROWS,
+ GlobalWindows.windowed_value(
+ (destination, row_and_insert_id[0])))
+ ]
+
+ # Flush current batch first if adding this row will exceed our limits
+ # limits: byte size; number of rows
+ if ((self._destination_buffer_byte_size[destination] + row_byte_size >
+ self._max_insert_payload_size) or
+ len(self._rows_buffer[destination]) >= self._max_batch_size):
+ flushed_batch = self._flush_batch(destination)
+ # After flushing our existing batch, we now buffer the current row
+ # for the next flush
+ self._rows_buffer[destination].append(row_and_insert_id)
+ self._destination_buffer_byte_size[destination] = row_byte_size
+ return flushed_batch
+
self._rows_buffer[destination].append(row_and_insert_id)
+ self._destination_buffer_byte_size[destination] += row_byte_size
self._total_buffered_rows += 1
- if len(self._rows_buffer[destination]) >= self._max_batch_size:
- return self._flush_batch(destination)
- elif self._total_buffered_rows >= self._max_buffered_rows:
+ if self._total_buffered_rows >= self._max_buffered_rows:
return self._flush_all_batches()
else:
# The input is already batched per destination, flush the rows now.
@@ -1549,7 +1592,6 @@ class BigQueryWriteFn(DoFn):
# Flush the current batch of rows to BigQuery.
rows_and_insert_ids = self._rows_buffer[destination]
table_reference = bigquery_tools.parse_table_reference(destination)
-
if table_reference.projectId is None:
table_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '')
@@ -1615,6 +1657,8 @@ class BigQueryWriteFn(DoFn):
self._total_buffered_rows -= len(self._rows_buffer[destination])
del self._rows_buffer[destination]
+ if destination in self._destination_buffer_byte_size:
+ del self._destination_buffer_byte_size[destination]
return itertools.chain([
pvalue.TaggedOutput(
@@ -1657,7 +1701,8 @@ class _StreamToBigQuery(PTransform):
with_auto_sharding,
num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
test_client=None,
- max_retries=None):
+ max_retries=None,
+ max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE):
self.table_reference = table_reference
self.table_side_inputs = table_side_inputs
self.schema_side_inputs = schema_side_inputs
@@ -1675,6 +1720,7 @@ class _StreamToBigQuery(PTransform):
self.with_auto_sharding = with_auto_sharding
self._num_streaming_keys = num_streaming_keys
self.max_retries = max_retries or MAX_INSERT_RETRIES
+ self._max_insert_payload_size = max_insert_payload_size
class InsertIdPrefixFn(DoFn):
def start_bundle(self):
@@ -1701,7 +1747,8 @@ class _StreamToBigQuery(PTransform):
ignore_insert_ids=self.ignore_insert_ids,
ignore_unknown_columns=self.ignore_unknown_columns,
with_batched_input=self.with_auto_sharding,
- max_retries=self.max_retries)
+ max_retries=self.max_retries,
+ max_insert_payload_size=self._max_insert_payload_size)
def _add_random_shard(element):
key = element[0]
@@ -1801,6 +1848,7 @@ class WriteToBigQuery(PTransform):
with_auto_sharding=False,
ignore_unknown_columns=False,
load_job_project_id=None,
+ max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
expansion_service=None):
"""Initialize a WriteToBigQuery transform.
@@ -1960,6 +2008,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
expansion_service: The address (host:port) of the expansion service.
If no expansion service is provided, will attempt to run the default
GCP expansion service. Used for STORAGE_WRITE_API method.
+ max_insert_payload_size: The maximum byte size for a BigQuery legacy
+ streaming insert payload.
"""
self._table = table
self._dataset = dataset
@@ -1997,6 +2047,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
self._ignore_insert_ids = ignore_insert_ids
self._ignore_unknown_columns = ignore_unknown_columns
self.load_job_project_id = load_job_project_id
+ self._max_insert_payload_size = max_insert_payload_size
self._num_streaming_keys = num_streaming_keys
# Dict/schema methods were moved to bigquery_tools, but keep references
@@ -2045,6 +2096,12 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
'triggering_frequency with STREAMING_INSERTS can only be used with
'
'with_auto_sharding=True.')
+ if self._max_insert_payload_size > MAX_INSERT_PAYLOAD_SIZE:
+ raise ValueError(
+ 'max_insert_payload_size can only go up to '
+ f'{MAX_INSERT_PAYLOAD_SIZE} bytes, as per BigQuery quota limits: '
+ 'https://cloud.google.com/bigquery/quotas#streaming_inserts.')
+
outputs = pcoll | _StreamToBigQuery(
table_reference=self.table_reference,
table_side_inputs=self.table_side_inputs,
@@ -2061,6 +2118,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
ignore_unknown_columns=self._ignore_unknown_columns,
with_auto_sharding=self.with_auto_sharding,
test_client=self.test_client,
+ max_insert_payload_size=self._max_insert_payload_size,
num_streaming_keys=self._num_streaming_keys)
return WriteResult(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 0b37ca46e1a..fe1a568f414 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -788,6 +788,37 @@ class TestWriteToBigQuery(unittest.TestCase):
with_auto_sharding=True,
test_client=client))
+ @mock.patch('google.cloud.bigquery.Client.insert_rows_json')
+ def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert):
+ mock_insert.return_value = []
+ table = 'project:dataset.table'
+ rows = [
+ {
+ 'columnA': 'value1'
+ },
+ {
+ 'columnA': 'value2'
+ },
+ # this very large row exceeds max size, so should be sent to DLQ
+ {
+ 'columnA': "large_string" * 100
+ }
+ ]
+ with beam.Pipeline() as p:
+ failed_rows = (
+ p
+ | beam.Create(rows)
+ | WriteToBigQuery(
+ table=table,
+ method='STREAMING_INSERTS',
+ create_disposition='CREATE_NEVER',
+ schema='columnA:STRING',
+ max_insert_payload_size=500))
+
+ expected_failed_rows = [(table, rows[2])]
+ assert_that(failed_rows.failed_rows, equal_to(expected_failed_rows))
+ self.assertEqual(2, mock_insert.call_count)
+
@parameterized.expand([
param(
exception_type=exceptions.Forbidden if exceptions else None,
@@ -1206,6 +1237,7 @@ class
BigQueryStreamingInsertTransformTests(unittest.TestCase):
fn.start_bundle()
fn.process(('project-id:dataset_id.table_id', ({'month': 1}, 'insertid1')))
fn.process(('project-id:dataset_id.table_id', ({'month': 2}, 'insertid2')))
+ fn.finish_bundle()
# InsertRows called as batch size is hit
self.assertTrue(client.insert_rows_json.called)