This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new eac926878c2 Use autoSharding from config when BigQueryIO.Write is
created. (#25862)
eac926878c2 is described below
commit eac926878c289cdf917e1eedb7aab1a1c2a2055c
Author: xianhualiu <[email protected]>
AuthorDate: Thu Mar 16 17:59:49 2023 -0400
Use autoSharding from config when BigQueryIO.Write is created. (#25862)
---
.../io/gcp/bigquery/BigQuerySchemaIOProvider.java | 9 +++++++--
...ueryStorageWriteApiSchemaTransformProvider.java | 23 ++++++++++++++++------
2 files changed, 24 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
index 59d798f8dc5..3d49dbc92eb 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
@@ -95,6 +95,7 @@ public class BigQuerySchemaIOProvider implements
SchemaIOProvider {
.addNullableField("queryLocation", FieldType.STRING)
.addNullableField("createDisposition", FieldType.STRING)
.addNullableField("useTestingBigQueryServices", FieldType.BOOLEAN)
+ .addNullableField("autoSharding", FieldType.BOOLEAN)
.build();
}
@@ -201,8 +202,12 @@ public class BigQuerySchemaIOProvider implements
SchemaIOProvider {
.useBeamSchema()
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(5))
-
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
- .withAutoSharding();
+
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
+
+ final Boolean autoSharding = config.getBoolean("autoSharding");
+ if (autoSharding != null && autoSharding) {
+ write = write.withAutoSharding();
+ }
final Boolean useTestingBigQueryServices =
config.getBoolean("useTestingBigQueryServices");
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index a29f5e60bed..605228e54a7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -191,6 +191,12 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
@Nullable
public abstract Boolean getUseAtLeastOnceSemantics();
+ @SchemaFieldDescription(
+ "This option enables using a dynamically determined number of shards
to write to "
+ + "BigQuery. Only applicable to unbounded data.")
+ @Nullable
+ public abstract Boolean getAutoSharding();
+
/** Builder for {@link
BigQueryStorageWriteApiSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
@@ -204,6 +210,8 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
+ public abstract Builder setAutoSharding(Boolean autoSharding);
+
/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration}
instance. */
public abstract BigQueryStorageWriteApiSchemaTransformProvider
.BigQueryStorageWriteApiSchemaTransformConfiguration
@@ -280,13 +288,16 @@ public class
BigQueryStorageWriteApiSchemaTransformProvider
if (inputRows.isBounded() == IsBounded.UNBOUNDED) {
Long triggeringFrequency =
configuration.getTriggeringFrequencySeconds();
+ Boolean autoSharding = configuration.getAutoSharding();
write =
- write
- .withAutoSharding()
- .withTriggeringFrequency(
- (triggeringFrequency == null || triggeringFrequency <= 0)
- ? DEFAULT_TRIGGERING_FREQUENCY
- : Duration.standardSeconds(triggeringFrequency));
+ write.withTriggeringFrequency(
+ (triggeringFrequency == null || triggeringFrequency <= 0)
+ ? DEFAULT_TRIGGERING_FREQUENCY
+ : Duration.standardSeconds(triggeringFrequency));
+
+ if (autoSharding != null && autoSharding) {
+ write = write.withAutoSharding();
+ }
}
Schema inputSchema = inputRows.getSchema();