This is an automated email from the ASF dual-hosted git repository.

claudevdm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f3d66546e7 Expose StorageWriteApiMaxRequestCallbackWaitTimeSec in BQ 
storage write. (#38470)
1f3d66546e7 is described below

commit 1f3d66546e71cb221df686c26d09638233b045b2
Author: claudevdm <[email protected]>
AuthorDate: Tue May 12 15:19:18 2026 -0400

    Expose StorageWriteApiMaxRequestCallbackWaitTimeSec in BQ storage write. 
(#38470)
---
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java   |  7 +++++++
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 10 ++++++++++
 2 files changed, 17 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 2f16a64b0d7..face2ef5841 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -153,6 +153,13 @@ public interface BigQueryOptions
 
   void setStorageWriteMaxInflightBytes(Long value);
 
+  @Description(
+      "Maximum time in seconds a Storage Write API append request is allowed 
to wait in the "
+          + "request callback queue before timing out. Overrides Storage Write 
API default (5 min)")
+  Integer getStorageWriteApiMaxRequestCallbackWaitTimeSec();
+
+  void setStorageWriteApiMaxRequestCallbackWaitTimeSec(Integer value);
+
   @Description(
       "Enables multiplexing mode, where multiple tables can share the same 
connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
           + " mode. This is recommended if your write operation is creating 
20+ connections. When using multiplexing, consider tuning "
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index aa9a5fd310b..14765a65ff0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1496,6 +1496,7 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
     private final BigQueryWriteClient newWriteClient;
     private final long storageWriteMaxInflightRequests;
     private final long storageWriteMaxInflightBytes;
+    private final @Nullable Integer 
storageWriteApiMaxRequestCallbackWaitTimeSec;
     private final BigQueryIOMetadata bqIOMetadata;
     private final PipelineOptions options;
 
@@ -1506,6 +1507,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
       this.options = options;
       this.storageWriteMaxInflightRequests = 
bqOptions.getStorageWriteMaxInflightRequests();
       this.storageWriteMaxInflightBytes = 
bqOptions.getStorageWriteMaxInflightBytes();
+      this.storageWriteApiMaxRequestCallbackWaitTimeSec =
+          bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
       this.bqIOMetadata = BigQueryIOMetadata.create();
     }
 
@@ -1514,6 +1517,8 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
       this.options = bqOptions;
       this.storageWriteMaxInflightRequests = 
bqOptions.getStorageWriteMaxInflightRequests();
       this.storageWriteMaxInflightBytes = 
bqOptions.getStorageWriteMaxInflightBytes();
+      this.storageWriteApiMaxRequestCallbackWaitTimeSec =
+          bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
       this.bqIOMetadata = BigQueryIOMetadata.create();
     }
 
@@ -1578,6 +1583,11 @@ public class BigQueryServicesImpl implements 
BigQueryServices {
                   
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
               .build());
 
+      if (storageWriteApiMaxRequestCallbackWaitTimeSec != null) {
+        StreamWriter.setMaxRequestCallbackWaitTime(
+            
java.time.Duration.ofSeconds(storageWriteApiMaxRequestCallbackWaitTimeSec));
+      }
+
       StreamWriter streamWriter =
           StreamWriter.newBuilder(streamName, newWriteClient)
               .setExecutorProvider(

Reply via email to