This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e23bc1ef3a Fix empty stream name encountered in 
StorageApiFinalizeWrtiesDoFn (#38410)
6e23bc1ef3a is described below

commit 6e23bc1ef3a28b58328c3c04af036f35f22a4f12
Author: Yi Hu <[email protected]>
AuthorDate: Fri May 15 09:58:15 2026 -0400

    Fix empty stream name encountered in StorageApiFinalizeWrtiesDoFn (#38410)
    
    * Potential fix of empty stream name encountered in 
StorageApiFinalizeWrtiesDoFn
    
    * Address comments
    
    * Address AI comments
    
    * comments
---
 .../bigquery/StorageApiWritesShardedRecords.java   | 43 ++++++++++------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index cbace6e7ff4..b644d7aa752 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -44,7 +44,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -559,8 +558,9 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
         String shortTableId,
         AppendClientInfo appendClientInfo,
         Callable<Boolean> tryCreateTable,
-        BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> 
initializeContexts,
-        Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients,
+        Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts,
+        Runnable resetClient,
+        ValueState<String> streamName,
         ValueState<Long> streamOffset,
         MultiOutputReceiver o) {
       // The first context is always the one that fails.
@@ -659,15 +659,6 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
         throw new RuntimeException(e);
       }
 
-      if (!quotaError) {
-        // For known errors (offset mismatch, not found) we must reestablish
-        // the streams.
-        // However we've seen that doing this fixes random stuckness issues by 
reestablishing
-        // gRPC connections,
-        // so we close the clients for all non-quota errors.
-
-        clearClients.accept(failedContexts);
-      }
       appendFailures.inc();
       int retriedRows = failedContext.protoRows.getSerializedRowsCount();
       BigQuerySinkMetrics.appendRowsRowStatusCounter(
@@ -722,11 +713,20 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
         // Finalize the stream and clear streamName so a new stream will be 
created.
         o.get(flushTag)
             .output(KV.of(failedContext.streamName, new 
Operation(failedContext.offset - 1, true)));
+
+        // Clear streamName so a new stream will be created in resetClient 
below.
+        streamName.write("");
+
+        // Re-establish the client with the new stream.
+        resetClient.run();
+
         // Reinitialize all contexts with the new stream and new offsets.
-        initializeContexts.accept(failedContexts, true);
+        initializeContexts.accept(failedContexts);
 
         // Offset failures imply that all subsequent parallel appends will 
also fail.
         // Retry them all.
+      } else if (!quotaError) {
+        resetClient.run();
       }
     }
 
@@ -912,13 +912,9 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
 
         // Initialize stream names and offsets for all contexts. This will be 
called initially, but
         // will also be called if we roll over to a new stream on a retry.
-        BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> 
initializeContexts =
-            (contexts, isFailure) -> {
+        Consumer<Iterable<AppendRowsContext<DestinationT>>> initializeContexts 
=
+            (contexts) -> {
               try {
-                if (isFailure) {
-                  // Clear the stream name, forcing a new one to be created.
-                  streamName.write("");
-                }
                 String streamNameRead = 
Preconditions.checkArgumentNotNull(streamName.read());
                 long currentOffset = 
Preconditions.checkArgumentNotNull(streamOffset.read());
                 for (AppendRowsContext<DestinationT> context : contexts) {
@@ -933,8 +929,8 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
               }
             };
 
-        Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients =
-            (contexts) -> {
+        Runnable resetClient =
+            () -> {
               try {
                 appendClientHolder.invalidateAndReset();
               } catch (Exception e) {
@@ -967,7 +963,8 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
                   appendClientHolder.get(),
                   tryCreateTable,
                   initializeContexts,
-                  clearClients,
+                  resetClient,
+                  streamName,
                   streamOffset,
                   o);
               return RetryType.RETRY_ALL_OPERATIONS;
@@ -1068,7 +1065,7 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
         Iterable<AppendRowsContext<DestinationT>> contexts = 
retryManager.getRemainingContexts();
 
         if (numAppends > 0) {
-          initializeContexts.accept(contexts, false);
+          initializeContexts.accept(contexts);
           retryManager.run(true);
 
           appendSplitDistribution.update(numAppends);

Reply via email to