This is an automated email from the ASF dual-hosted git repository.
claudevdm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1f3d66546e7 Expose StorageWriteApiMaxRequestCallbackWaitTimeSec in BQ
storage write. (#38470)
1f3d66546e7 is described below
commit 1f3d66546e71cb221df686c26d09638233b045b2
Author: claudevdm <[email protected]>
AuthorDate: Tue May 12 15:19:18 2026 -0400
Expose StorageWriteApiMaxRequestCallbackWaitTimeSec in BQ storage write.
(#38470)
---
.../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 7 +++++++
.../apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 10 ++++++++++
2 files changed, 17 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 2f16a64b0d7..face2ef5841 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -153,6 +153,13 @@ public interface BigQueryOptions
void setStorageWriteMaxInflightBytes(Long value);
+ @Description(
+ "Maximum time in seconds a Storage Write API append request is allowed
to wait in the "
+ + "request callback queue before timing out. Overrides Storage Write
API default (5 min)")
+ Integer getStorageWriteApiMaxRequestCallbackWaitTimeSec();
+
+ void setStorageWriteApiMaxRequestCallbackWaitTimeSec(Integer value);
+
@Description(
"Enables multiplexing mode, where multiple tables can share the same
connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+ " mode. This is recommended if your write operation is creating
20+ connections. When using multiplexing, consider tuning "
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index aa9a5fd310b..14765a65ff0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1496,6 +1496,7 @@ public class BigQueryServicesImpl implements
BigQueryServices {
private final BigQueryWriteClient newWriteClient;
private final long storageWriteMaxInflightRequests;
private final long storageWriteMaxInflightBytes;
+ private final @Nullable Integer
storageWriteApiMaxRequestCallbackWaitTimeSec;
private final BigQueryIOMetadata bqIOMetadata;
private final PipelineOptions options;
@@ -1506,6 +1507,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
this.options = options;
this.storageWriteMaxInflightRequests =
bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes =
bqOptions.getStorageWriteMaxInflightBytes();
+ this.storageWriteApiMaxRequestCallbackWaitTimeSec =
+ bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
this.bqIOMetadata = BigQueryIOMetadata.create();
}
@@ -1514,6 +1517,8 @@ public class BigQueryServicesImpl implements
BigQueryServices {
this.options = bqOptions;
this.storageWriteMaxInflightRequests =
bqOptions.getStorageWriteMaxInflightRequests();
this.storageWriteMaxInflightBytes =
bqOptions.getStorageWriteMaxInflightBytes();
+ this.storageWriteApiMaxRequestCallbackWaitTimeSec =
+ bqOptions.getStorageWriteApiMaxRequestCallbackWaitTimeSec();
this.bqIOMetadata = BigQueryIOMetadata.create();
}
@@ -1578,6 +1583,11 @@ public class BigQueryServicesImpl implements
BigQueryServices {
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
.build());
+ if (storageWriteApiMaxRequestCallbackWaitTimeSec != null) {
+ StreamWriter.setMaxRequestCallbackWaitTime(
+
java.time.Duration.ofSeconds(storageWriteApiMaxRequestCallbackWaitTimeSec));
+ }
+
StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(