This is an automated email from the ASF dual-hosted git repository.

stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f5b5142c08 Fix BigQuery Storage Write API stream count for bounded 
writes (#38776)
0f5b5142c08 is described below

commit 0f5b5142c081e3119fb70ac7aafa954de9cacdeb
Author: Akshat <[email protected]>
AuthorDate: Fri Jun 5 16:30:05 2026 +0530

    Fix BigQuery Storage Write API stream count for bounded writes (#38776)
    
    * Fix BigQuery Storage Write API stream count for bounded writes
---
 .../BigQueryStorageWriteApiSchemaTransformProvider.java          | 9 +++++----
 .../io/gcp/bigquery/providers/BigQueryWriteConfiguration.java    | 9 +++++++--
 .../BigQueryStorageWriteApiSchemaTransformProviderTest.java      | 5 ++++-
 sdks/python/apache_beam/io/gcp/bigquery.py                       | 3 +--
 .../site/content/en/documentation/io/built-in/google-bigquery.md | 2 +-
 5 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index e3d94723501..1d618ba685e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -179,11 +179,11 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
       PCollection<Row> inputRows = input.getSinglePCollection();
 
       BigQueryIO.Write<Row> write = 
createStorageWriteApiTransform(inputRows.getSchema());
+      int numStreams = configuration.getNumStreams() == null ? 0 : 
configuration.getNumStreams();
 
       if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
         Long triggeringFrequency = 
configuration.getTriggeringFrequencySeconds();
         Boolean autoSharding = configuration.getAutoSharding();
-        int numStreams = configuration.getNumStreams() == null ? 0 : 
configuration.getNumStreams();
 
         boolean useAtLeastOnceSemantics =
             configuration.getUseAtLeastOnceSemantics() != null
@@ -197,12 +197,13 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
                       : Duration.standardSeconds(triggeringFrequency));
         }
         // set num streams if specified, otherwise default to autoSharding
-        if (numStreams > 0) {
-          write = write.withNumStorageWriteApiStreams(numStreams);
-        } else if (autoSharding == null || autoSharding) {
+        if (numStreams == 0 && (autoSharding == null || autoSharding)) {
           write = write.withAutoSharding();
         }
       }
+      if (numStreams > 0) {
+        write = write.withNumStorageWriteApiStreams(numStreams);
+      }
 
       Schema inputSchema = inputRows.getSchema();
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
index 55d7f7c8d72..7f09feb245c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
@@ -101,6 +101,12 @@ public abstract class BigQueryWriteConfiguration {
 
     Boolean autoSharding = getAutoSharding();
     Integer numStreams = getNumStreams();
+    if (numStreams != null) {
+      checkArgument(
+          numStreams >= 0,
+          invalidConfigMessage + "numStreams must be non-negative, but was: 
%s",
+          numStreams);
+    }
     if (autoSharding != null && autoSharding && numStreams != null) {
       checkArgument(
           numStreams == 0,
@@ -152,8 +158,7 @@ public abstract class BigQueryWriteConfiguration {
   public abstract Boolean getAutoSharding();
 
   @SchemaFieldDescription(
-      "Specifies the number of write streams that the Storage API sink will 
use. "
-          + "This parameter is only applicable when writing unbounded data.")
+      "Specifies the number of write streams that the Storage API sink will 
use.")
   @Nullable
   public abstract Integer getNumStreams();
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index 58430977828..81789f78425 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -122,7 +122,10 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
         Arrays.asList(
             BigQueryWriteConfiguration.builder()
                 .setTable("project:dataset.table")
-                .setCreateDisposition("INVALID_DISPOSITION"));
+                .setCreateDisposition("INVALID_DISPOSITION"),
+            BigQueryWriteConfiguration.builder()
+                .setTable("project:dataset.table")
+                .setNumStreams(-1));
 
     for (BigQueryWriteConfiguration.Builder config : invalidConfigs) {
       assertThrows(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index d751d60c905..a2d17f12569 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2155,8 +2155,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
         all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
         applicable to unbounded input.
       num_storage_api_streams: Specifies the number of write streams that the
-        Storage API sink will use. This parameter is only applicable when
-        writing unbounded data.
+        Storage API sink will use.
       ignore_unknown_columns: Accept rows that contain values that do not match
         the schema. The unknown values are ignored. Default is False,
         which treats unknown values as errors. This option is only valid for
diff --git 
a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md 
b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 9c205f09266..00cd1ffbcde 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -855,7 +855,7 @@ pipeline uses. You can set it explicitly on the transform 
via
 
[`withNumStorageWriteApiStreams`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-)
 or provide the `numStorageWriteApiStreams` option to the pipeline as defined in
 
[`BigQueryOptions`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html).
-Please note this is only supported for streaming pipelines.
+Fixed stream counts can be used with both batch and streaming pipelines.
 
 Triggering frequency determines how soon the data is visible for querying in
 BigQuery. You can explicitly set it via

Reply via email to