This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new eac926878c2 Use autoSharding from config when BigQueryIO.Write is 
created. (#25862)
eac926878c2 is described below

commit eac926878c289cdf917e1eedb7aab1a1c2a2055c
Author: xianhualiu <[email protected]>
AuthorDate: Thu Mar 16 17:59:49 2023 -0400

    Use autoSharding from config when BigQueryIO.Write is created. (#25862)
---
 .../io/gcp/bigquery/BigQuerySchemaIOProvider.java  |  9 +++++++--
 ...ueryStorageWriteApiSchemaTransformProvider.java | 23 ++++++++++++++++------
 2 files changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
index 59d798f8dc5..3d49dbc92eb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
@@ -95,6 +95,7 @@ public class BigQuerySchemaIOProvider implements 
SchemaIOProvider {
         .addNullableField("queryLocation", FieldType.STRING)
         .addNullableField("createDisposition", FieldType.STRING)
         .addNullableField("useTestingBigQueryServices", FieldType.BOOLEAN)
+        .addNullableField("autoSharding", FieldType.BOOLEAN)
         .build();
   }
 
@@ -201,8 +202,12 @@ public class BigQuerySchemaIOProvider implements 
SchemaIOProvider {
                   .useBeamSchema()
                   .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
                   .withTriggeringFrequency(Duration.standardSeconds(5))
-                  
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
-                  .withAutoSharding();
+                  
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
+
+          final Boolean autoSharding = config.getBoolean("autoSharding");
+          if (autoSharding != null && autoSharding) {
+            write = write.withAutoSharding();
+          }
 
           final Boolean useTestingBigQueryServices =
               config.getBoolean("useTestingBigQueryServices");
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index a29f5e60bed..605228e54a7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -191,6 +191,12 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
     @Nullable
     public abstract Boolean getUseAtLeastOnceSemantics();
 
+    @SchemaFieldDescription(
+        "This option enables using a dynamically determined number of shards 
to write to "
+            + "BigQuery. Only applicable to unbounded data.")
+    @Nullable
+    public abstract Boolean getAutoSharding();
+
     /** Builder for {@link 
BigQueryStorageWriteApiSchemaTransformConfiguration}. */
     @AutoValue.Builder
     public abstract static class Builder {
@@ -204,6 +210,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
 
       public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
 
+      public abstract Builder setAutoSharding(Boolean autoSharding);
+
       /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} 
instance. */
       public abstract BigQueryStorageWriteApiSchemaTransformProvider
               .BigQueryStorageWriteApiSchemaTransformConfiguration
@@ -280,13 +288,16 @@ public class 
BigQueryStorageWriteApiSchemaTransformProvider
 
       if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
         Long triggeringFrequency = 
configuration.getTriggeringFrequencySeconds();
+        Boolean autoSharding = configuration.getAutoSharding();
         write =
-            write
-                .withAutoSharding()
-                .withTriggeringFrequency(
-                    (triggeringFrequency == null || triggeringFrequency <= 0)
-                        ? DEFAULT_TRIGGERING_FREQUENCY
-                        : Duration.standardSeconds(triggeringFrequency));
+            write.withTriggeringFrequency(
+                (triggeringFrequency == null || triggeringFrequency <= 0)
+                    ? DEFAULT_TRIGGERING_FREQUENCY
+                    : Duration.standardSeconds(triggeringFrequency));
+
+        if (autoSharding != null && autoSharding) {
+          write = write.withAutoSharding();
+        }
       }
 
       Schema inputSchema = inputRows.getSchema();

Reply via email to