This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6e23bc1ef3a Fix empty stream name encountered in
StorageApiFinalizeWrtiesDoFn (#38410)
6e23bc1ef3a is described below
commit 6e23bc1ef3a28b58328c3c04af036f35f22a4f12
Author: Yi Hu <[email protected]>
AuthorDate: Fri May 15 09:58:15 2026 -0400
Fix empty stream name encountered in StorageApiFinalizeWrtiesDoFn (#38410)
* Potential fix of empty stream name encountered in
StorageApiFinalizeWrtiesDoFn
* Address comments
* Address AI comments
* comments
---
.../bigquery/StorageApiWritesShardedRecords.java | 43 ++++++++++------------
1 file changed, 20 insertions(+), 23 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index cbace6e7ff4..b644d7aa752 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -44,7 +44,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -559,8 +558,9 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
String shortTableId,
AppendClientInfo appendClientInfo,
Callable<Boolean> tryCreateTable,
- BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean>
initializeContexts,
- Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients,
+ Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts,
+ Runnable resetClient,
+ ValueState<String> streamName,
ValueState<Long> streamOffset,
MultiOutputReceiver o) {
// The first context is always the one that fails.
@@ -659,15 +659,6 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
throw new RuntimeException(e);
}
- if (!quotaError) {
- // For known errors (offset mismatch, not found) we must reestablish
- // the streams.
- // However we've seen that doing this fixes random stuckness issues by
reestablishing
- // gRPC connections,
- // so we close the clients for all non-quota errors.
-
- clearClients.accept(failedContexts);
- }
appendFailures.inc();
int retriedRows = failedContext.protoRows.getSerializedRowsCount();
BigQuerySinkMetrics.appendRowsRowStatusCounter(
@@ -722,11 +713,20 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
// Finalize the stream and clear streamName so a new stream will be
created.
o.get(flushTag)
.output(KV.of(failedContext.streamName, new
Operation(failedContext.offset - 1, true)));
+
+ // Clear streamName so a new stream will be created in resetClient
below.
+ streamName.write("");
+
+ // Re-establish the client with the new stream.
+ resetClient.run();
+
// Reinitialize all contexts with the new stream and new offsets.
- initializeContexts.accept(failedContexts, true);
+ initializeContexts.accept(failedContexts);
// Offset failures imply that all subsequent parallel appends will
also fail.
// Retry them all.
+ } else if (!quotaError) {
+ resetClient.run();
}
}
@@ -912,13 +912,9 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
// Initialize stream names and offsets for all contexts. This will be
called initially, but
// will also be called if we roll over to a new stream on a retry.
- BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean>
initializeContexts =
- (contexts, isFailure) -> {
+ Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts
=
+ (contexts) -> {
try {
- if (isFailure) {
- // Clear the stream name, forcing a new one to be created.
- streamName.write("");
- }
String streamNameRead =
Preconditions.checkArgumentNotNull(streamName.read());
long currentOffset =
Preconditions.checkArgumentNotNull(streamOffset.read());
for (AppendRowsContext<DestinationT> context : contexts) {
@@ -933,8 +929,8 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
}
};
- Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients =
- (contexts) -> {
+ Runnable resetClient =
+ () -> {
try {
appendClientHolder.invalidateAndReset();
} catch (Exception e) {
@@ -967,7 +963,8 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
appendClientHolder.get(),
tryCreateTable,
initializeContexts,
- clearClients,
+ resetClient,
+ streamName,
streamOffset,
o);
return RetryType.RETRY_ALL_OPERATIONS;
@@ -1068,7 +1065,7 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
Iterable<AppendRowsContext<DestinationT>> contexts =
retryManager.getRemainingContexts();
if (numAppends > 0) {
- initializeContexts.accept(contexts, false);
+ initializeContexts.accept(contexts);
retryManager.run(true);
appendSplitDistribution.update(numAppends);