ahmedabu98 commented on code in PR #24979:
URL: https://github.com/apache/beam/pull/24979#discussion_r1253421565


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1515,11 +1528,43 @@ def process(self, element, *schema_side_inputs):
 
     if not self.with_batched_input:
       row_and_insert_id = element[1]
+      row_byte_size = get_deep_size(row_and_insert_id)
+      # maintain buffer byte size for each destination
+      self._destination_buffer_byte_size.setdefault(destination, 0)
+      # send large rows that exceed BigQuery insert limits to DLQ
+      if row_byte_size >= self.max_insert_payload_size:
+        row_mb_size = row_byte_size / 1_000_000
+        max_mb_size = self.max_insert_payload_size / 1_000_000
+        error = (

Review Comment:
   Yes that can trigger the same error if users batch their rows beforehand in 
a way that exceeds limits. However, I think handling this can be a separate 
effort as that is a different use case. This PR handles the more general use 
case of writing a PCollection of rows, which a few users were using when 
running into this issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to