This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d19b534ba0b Fix split thresholds for BQ sink (#36422)
d19b534ba0b is described below

commit d19b534ba0b52377b1514016366d64e2cb452a41
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Oct 7 14:55:14 2025 -0700

    Fix split thresholds for BQ sink (#36422)
    
    * ensure that we don't exceed split threshold
    
    * fix OBO error
---
 .../beam/sdk/io/gcp/bigquery/SplittingIterable.java     | 15 +++++++++++----
 .../gcp/bigquery/StorageApiWriteUnshardedRecords.java   | 17 +++++++++++------
 2 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
index e40824eab08..fbcd4250a90 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
@@ -28,7 +28,9 @@ import java.util.NoSuchElementException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
@@ -85,7 +87,8 @@ class SplittingIterable implements 
Iterable<SplittingIterable.Value> {
   @Override
   public Iterator<Value> iterator() {
     return new Iterator<Value>() {
-      final Iterator<StorageApiWritePayload> underlyingIterator = 
underlying.iterator();
+      final PeekingIterator<StorageApiWritePayload> underlyingIterator =
+          Iterators.peekingIterator(underlying.iterator());
 
       @Override
       public boolean hasNext() {
@@ -103,6 +106,13 @@ class SplittingIterable implements 
Iterable<SplittingIterable.Value> {
         ProtoRows.Builder inserts = ProtoRows.newBuilder();
         long bytesSize = 0;
         while (underlyingIterator.hasNext()) {
+          // Make sure that we don't exceed the split-size length over 
multiple elements. A single
+          // element can exceed
+          // the split threshold, but in that case it should be the only 
element returned.
+          if ((bytesSize + underlyingIterator.peek().getPayload().length > 
splitSize)
+              && inserts.getSerializedRowsCount() > 0) {
+            break;
+          }
           StorageApiWritePayload payload = underlyingIterator.next();
           ByteString byteString = ByteString.copyFrom(payload.getPayload());
           @Nullable TableRow failsafeTableRow = null;
@@ -157,9 +167,6 @@ class SplittingIterable implements 
Iterable<SplittingIterable.Value> {
           timestamps.add(timestamp);
           failsafeRows.add(failsafeTableRow);
           bytesSize += byteString.size();
-          if (bytesSize > splitSize) {
-            break;
-          }
         }
         return new AutoValue_SplittingIterable_Value(inserts.build(), 
timestamps, failsafeRows);
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 0d483367f7b..ab8de041be8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -1008,15 +1008,18 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       this.bigLakeConfiguration = bigLakeConfiguration;
     }
 
-    boolean shouldFlush() {
-      return numPendingRecords > flushThresholdCount || numPendingRecordBytes 
> flushThresholdBytes;
+    boolean shouldFlush(int recordBytes) {
+      return numPendingRecords > flushThresholdCount
+          || (((numPendingRecordBytes + recordBytes) > flushThresholdBytes)
+              && numPendingRecords > 0);
     }
 
     void flushIfNecessary(
         OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver,
-        @Nullable OutputReceiver<TableRow> successfulRowsReceiver)
+        @Nullable OutputReceiver<TableRow> successfulRowsReceiver,
+        int recordBytes)
         throws Exception {
-      if (shouldFlush()) {
+      if (shouldFlush(recordBytes)) {
         forcedFlushes.inc();
         // Too much memory being used. Flush the state and wait for it to 
drain out.
         // TODO(reuvenlax): Consider waiting for memory usage to drop instead 
of waiting for all the
@@ -1172,10 +1175,12 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       @Nullable
       OutputReceiver<TableRow> successfulRowsReceiver =
           (successfulRowsTag != null) ? o.get(successfulRowsTag) : null;
-      flushIfNecessary(failedRowsReceiver, successfulRowsReceiver);
+
+      int recordBytes = element.getValue().getPayload().length;
+      flushIfNecessary(failedRowsReceiver, successfulRowsReceiver, 
recordBytes);
       state.addMessage(element.getValue(), elementTs, failedRowsReceiver);
       ++numPendingRecords;
-      numPendingRecordBytes += element.getValue().getPayload().length;
+      numPendingRecordBytes += recordBytes;
     }
 
     private OutputReceiver<TableRow> makeSuccessfulRowsreceiver(

Reply via email to