This is an automated email from the ASF dual-hosted git repository. Amar3tto pushed a commit to branch cp-38410 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9608ee0c949fb4c46fa7781eb85bf5ebb7aab286 Author: Vitaly Terentyev <[email protected]> AuthorDate: Fri May 15 18:55:48 2026 +0400 [Cherrypick] Fix empty stream name encountered in StorageApiFinalizeWrtiesDoFn (#38410) --- .../bigquery/StorageApiWritesShardedRecords.java | 43 ++++++++++------------ 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cbace6e7ff4..b644d7aa752 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -44,7 +44,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -559,8 +558,9 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object String shortTableId, AppendClientInfo appendClientInfo, Callable<Boolean> tryCreateTable, - BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> initializeContexts, - Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients, + Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts, + Runnable resetClient, + ValueState<String> streamName, ValueState<Long> streamOffset, MultiOutputReceiver o) { // The first context is always the one that fails. @@ -659,15 +659,6 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object throw new RuntimeException(e); } - if (!quotaError) { - // For known errors (offset mismatch, not found) we must reestablish - // the streams. - // However we've seen that doing this fixes random stuckness issues by reestablishing - // gRPC connections, - // so we close the clients for all non-quota errors. - - clearClients.accept(failedContexts); - } appendFailures.inc(); int retriedRows = failedContext.protoRows.getSerializedRowsCount(); BigQuerySinkMetrics.appendRowsRowStatusCounter( @@ -722,11 +713,20 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object // Finalize the stream and clear streamName so a new stream will be created. o.get(flushTag) .output(KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true))); + + // Clear streamName so a new stream will be created in resetClient below. + streamName.write(""); + + // Re-establish the client with the new stream. + resetClient.run(); + // Reinitialize all contexts with the new stream and new offsets. - initializeContexts.accept(failedContexts, true); + initializeContexts.accept(failedContexts); // Offset failures imply that all subsequent parallel appends will also fail. // Retry them all. + } else if (!quotaError) { + resetClient.run(); } } @@ -912,13 +912,9 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object // Initialize stream names and offsets for all contexts. This will be called initially, but // will also be called if we roll over to a new stream on a retry. - BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> initializeContexts = - (contexts, isFailure) -> { + Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts = + (contexts) -> { try { - if (isFailure) { - // Clear the stream name, forcing a new one to be created. - streamName.write(""); - } String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); for (AppendRowsContext<DestinationT> context : contexts) { @@ -933,8 +929,8 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object } }; - Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients = - (contexts) -> { + Runnable resetClient = + () -> { try { appendClientHolder.invalidateAndReset(); } catch (Exception e) { @@ -967,7 +963,8 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object appendClientHolder.get(), tryCreateTable, initializeContexts, - clearClients, + resetClient, + streamName, streamOffset, o); return RetryType.RETRY_ALL_OPERATIONS; @@ -1068,7 +1065,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object Iterable<AppendRowsContext<DestinationT>> contexts = retryManager.getRemainingContexts(); if (numAppends > 0) { - initializeContexts.accept(contexts, false); + initializeContexts.accept(contexts); retryManager.run(true); appendSplitDistribution.update(numAppends);
