This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d19b534ba0b Fix split thresholds for BQ sink (#36422)
d19b534ba0b is described below
commit d19b534ba0b52377b1514016366d64e2cb452a41
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Oct 7 14:55:14 2025 -0700
Fix split thresholds for BQ sink (#36422)
* ensure that we don't exceed split threshold
* fix OBO error
---
.../beam/sdk/io/gcp/bigquery/SplittingIterable.java | 15 +++++++++++----
.../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 17 +++++++++++------
2 files changed, 22 insertions(+), 10 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
index e40824eab08..fbcd4250a90 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
@@ -28,7 +28,9 @@ import java.util.NoSuchElementException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.beam.sdk.values.TimestampedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
@@ -85,7 +87,8 @@ class SplittingIterable implements
Iterable<SplittingIterable.Value> {
@Override
public Iterator<Value> iterator() {
return new Iterator<Value>() {
- final Iterator<StorageApiWritePayload> underlyingIterator =
underlying.iterator();
+ final PeekingIterator<StorageApiWritePayload> underlyingIterator =
+ Iterators.peekingIterator(underlying.iterator());
@Override
public boolean hasNext() {
@@ -103,6 +106,13 @@ class SplittingIterable implements
Iterable<SplittingIterable.Value> {
ProtoRows.Builder inserts = ProtoRows.newBuilder();
long bytesSize = 0;
while (underlyingIterator.hasNext()) {
+ // Make sure that we don't exceed the split-size length over
multiple elements. A single
+ // element can exceed
+ // the split threshold, but in that case it should be the only
element returned.
+ if ((bytesSize + underlyingIterator.peek().getPayload().length >
splitSize)
+ && inserts.getSerializedRowsCount() > 0) {
+ break;
+ }
StorageApiWritePayload payload = underlyingIterator.next();
ByteString byteString = ByteString.copyFrom(payload.getPayload());
@Nullable TableRow failsafeTableRow = null;
@@ -157,9 +167,6 @@ class SplittingIterable implements
Iterable<SplittingIterable.Value> {
timestamps.add(timestamp);
failsafeRows.add(failsafeTableRow);
bytesSize += byteString.size();
- if (bytesSize > splitSize) {
- break;
- }
}
return new AutoValue_SplittingIterable_Value(inserts.build(),
timestamps, failsafeRows);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 0d483367f7b..ab8de041be8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -1008,15 +1008,18 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
this.bigLakeConfiguration = bigLakeConfiguration;
}
- boolean shouldFlush() {
- return numPendingRecords > flushThresholdCount || numPendingRecordBytes
> flushThresholdBytes;
+ boolean shouldFlush(int recordBytes) {
+ return numPendingRecords > flushThresholdCount
+ || (((numPendingRecordBytes + recordBytes) > flushThresholdBytes)
+ && numPendingRecords > 0);
}
void flushIfNecessary(
OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver,
- @Nullable OutputReceiver<TableRow> successfulRowsReceiver)
+ @Nullable OutputReceiver<TableRow> successfulRowsReceiver,
+ int recordBytes)
throws Exception {
- if (shouldFlush()) {
+ if (shouldFlush(recordBytes)) {
forcedFlushes.inc();
// Too much memory being used. Flush the state and wait for it to
drain out.
// TODO(reuvenlax): Consider waiting for memory usage to drop instead
of waiting for all the
@@ -1172,10 +1175,12 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
@Nullable
OutputReceiver<TableRow> successfulRowsReceiver =
(successfulRowsTag != null) ? o.get(successfulRowsTag) : null;
- flushIfNecessary(failedRowsReceiver, successfulRowsReceiver);
+
+ int recordBytes = element.getValue().getPayload().length;
+ flushIfNecessary(failedRowsReceiver, successfulRowsReceiver,
recordBytes);
state.addMessage(element.getValue(), elementTs, failedRowsReceiver);
++numPendingRecords;
- numPendingRecordBytes += element.getValue().getPayload().length;
+ numPendingRecordBytes += recordBytes;
}
private OutputReceiver<TableRow> makeSuccessfulRowsreceiver(