This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ac25d46ae2 Adding clustering support for python storage write api
(#35526)
5ac25d46ae2 is described below
commit 5ac25d46ae2894c5fa8803803572f47d2920af7d
Author: Tanu Sharma <[email protected]>
AuthorDate: Mon Jul 21 19:17:00 2025 +0530
Adding clustering support for python storage write api (#35526)
* Adding clustering support for python storage write api
* Added test for managed io and some corrections
* Python formatting
* Correction
* Addressed comments
* Corrected test as per recent changes
---
.../beam/sdk/io/gcp/bigquery/TableDestination.java | 2 +-
...ueryStorageWriteApiSchemaTransformProvider.java | 1 -
.../providers/BigQueryWriteConfiguration.java | 5 ++++
.../providers/PortableBigQueryDestinations.java | 8 ++++++
.../gcp/bigquery/providers/BigQueryManagedIT.java | 20 +++++++++++----
.../io/external/xlang_bigqueryio_it_test.py | 30 ++++++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigquery.py | 13 ++++++++++
7 files changed, 72 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index df96e1bc226..6b81c2322a5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -64,7 +64,7 @@ public class TableDestination implements Serializable {
public TableDestination(
String tableSpec,
@Nullable String tableDescription,
- TimePartitioning timePartitioning,
+ @Nullable TimePartitioning timePartitioning,
Clustering clustering) {
this(
tableSpec,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 1e53ad3553e..bb8f7200342 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -307,7 +307,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}
-
if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
index 505ce7125ce..5df6e1f6afc 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
@@ -194,6 +194,9 @@ public abstract class BigQueryWriteConfiguration {
+ "Is mutually exclusive with 'keep' and 'drop'.")
public abstract @Nullable String getOnly();
+ @SchemaFieldDescription("A list of columns to cluster the BigQuery table
by.")
+ public abstract @Nullable List<String> getClusteringFields();
+
/** Builder for {@link BigQueryWriteConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
@@ -226,6 +229,8 @@ public abstract class BigQueryWriteConfiguration {
public abstract Builder setOnly(String only);
+ public abstract Builder setClusteringFields(List<String> clusteringFields);
+
/** Builds a {@link BigQueryWriteConfiguration} instance. */
public abstract BigQueryWriteConfiguration build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
index 0cd2b65b085..42eee4f3f03 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfigu
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@@ -48,8 +49,10 @@ public class PortableBigQueryDestinations extends
DynamicDestinations<Row, Strin
private @MonotonicNonNull RowStringInterpolator interpolator = null;
private final @Nullable List<String> primaryKey;
private final RowFilter rowFilter;
+ private final @Nullable List<String> clusteringFields;
public PortableBigQueryDestinations(Schema rowSchema,
BigQueryWriteConfiguration configuration) {
+ this.clusteringFields = configuration.getClusteringFields();
// DYNAMIC_DESTINATIONS magic string is the old way of doing it for
cross-language.
// In that case, we do no interpolation
if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
@@ -79,6 +82,11 @@ public class PortableBigQueryDestinations extends
DynamicDestinations<Row, Strin
@Override
public TableDestination getTable(String destination) {
+
+ if (clusteringFields != null && !clusteringFields.isEmpty()) {
+ Clustering clustering = new Clustering().setFields(clusteringFields);
+ return new TableDestination(destination, null, null, clustering);
+ }
return new TableDestination(destination, null);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
index 6a422f1832d..4c164e6a38d 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import com.google.api.services.bigquery.model.Clustering;
+import com.google.api.services.bigquery.model.Table;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -50,6 +52,7 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -82,6 +85,8 @@ public class BigQueryManagedIT {
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" +
System.nanoTime();
+ private static final Clustering CLUSTERING = new
Clustering().setFields(Arrays.asList("str"));
+
@BeforeClass
public static void setUpTestEnvironment() throws IOException,
InterruptedException {
// Create one BQ dataset for all test cases.
@@ -94,10 +99,11 @@ public class BigQueryManagedIT {
}
@Test
- public void testBatchFileLoadsWriteRead() {
+ public void testBatchFileLoadsWriteRead() throws IOException,
InterruptedException {
String table =
String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID,
testName.getMethodName());
- Map<String, Object> writeConfig = ImmutableMap.of("table", table);
+ Map<String, Object> writeConfig =
+ ImmutableMap.of("table", table, "clustering_fields",
Collections.singletonList("str"));
// file loads requires a GCS temp location
String tempLocation =
writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
@@ -117,6 +123,11 @@ public class BigQueryManagedIT {
.getSinglePCollection();
PAssert.that(outputRows).containsInAnyOrder(ROWS);
readPipeline.run().waitUntilFinish();
+
+ // Asserting clustering
+ Table tableMetadata =
+ BQ_CLIENT.getTableResource(PROJECT, BIG_QUERY_DATASET_ID,
testName.getMethodName());
+ Assert.assertEquals(CLUSTERING, tableMetadata.getClustering());
}
@Test
@@ -148,7 +159,7 @@ public class BigQueryManagedIT {
public void testDynamicDestinations(boolean streaming) throws IOException,
InterruptedException {
String baseTableName =
- String.format("%s:%s.dynamic_" + System.nanoTime(), PROJECT,
BIG_QUERY_DATASET_ID);
+ String.format("%s.%s.dynamic_" + System.nanoTime(), PROJECT,
BIG_QUERY_DATASET_ID);
String destinationTemplate = baseTableName + "_{dest}";
Map<String, Object> config =
ImmutableMap.of("table", destinationTemplate, "drop",
Collections.singletonList("dest"));
@@ -173,8 +184,7 @@ public class BigQueryManagedIT {
long mod = i;
String dest = destinations.get(i);
List<Row> writtenRows =
- BQ_CLIENT
- .queryUnflattened(String.format("SELECT * FROM [%s]", dest),
PROJECT, true, false)
+ BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM `%s`",
dest), PROJECT, true, true)
.stream()
.map(tableRow ->
BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tableRow))
.collect(Collectors.toList());
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index 7f3a16e02aa..38d9174cef2 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -245,6 +245,36 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
| StorageWriteToBigQuery(table=table_id))
hamcrest_assert(p, bq_matcher)
+ def test_write_with_clustering(self):
+ table = 'write_with_clustering'
+ table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
+
+ bq_matcher = BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM {}.{}".format(self.dataset_id, table),
+ data=self.parse_expected_data(self.ELEMENTS))
+
+ with beam.Pipeline(argv=self.args) as p:
+ _ = (
+ p
+ | "Create test data" >> beam.Create(self.ELEMENTS)
+ | beam.io.WriteToBigQuery(
+ table=table_id,
+ method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
+ schema=self.ALL_TYPES_SCHEMA,
+ create_disposition='CREATE_IF_NEEDED',
+ write_disposition='WRITE_TRUNCATE',
+ additional_bq_parameters={'clustering': {
+ 'fields': ['int']
+ }}))
+
+ # After pipeline finishes, verify clustering is applied
+ table = self.bigquery_client.get_table(self.project, self.dataset_id,
table)
+ clustering_fields = table.clustering.fields if table.clustering else []
+
+ self.assertEqual(clustering_fields, ['int'])
+ hamcrest_assert(p, bq_matcher)
+
def test_write_with_beam_rows_cdc(self):
table = 'write_with_beam_rows_cdc'
table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 870a77b59e6..baabedb20e4 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2364,6 +2364,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider`
that has a JSON string,
table_side_inputs=self.table_side_inputs,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
+ additional_bq_parameters=self.additional_bq_parameters,
triggering_frequency=self.triggering_frequency,
use_at_least_once=self.use_at_least_once,
with_auto_sharding=self.with_auto_sharding,
@@ -2611,6 +2612,7 @@ class StorageWriteToBigQuery(PTransform):
schema=None,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
+ additional_bq_parameters=None,
triggering_frequency=0,
use_at_least_once=False,
with_auto_sharding=False,
@@ -2623,6 +2625,7 @@ class StorageWriteToBigQuery(PTransform):
self._schema = schema
self._create_disposition = create_disposition
self._write_disposition = write_disposition
+ self.additional_bq_parameters = additional_bq_parameters
self._triggering_frequency = triggering_frequency
self._use_at_least_once = use_at_least_once
self._with_auto_sharding = with_auto_sharding
@@ -2698,6 +2701,15 @@ class StorageWriteToBigQuery(PTransform):
# communicate to Java that this write should use dynamic destinations
table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS
+ clustering_fields = []
+ if self.additional_bq_parameters:
+ if callable(self.additional_bq_parameters):
+ raise NotImplementedError(
+ "Currently, dynamic clustering and timepartitioning is not "
+ "supported for STORAGE_WRITE_API write method.")
+ clustering_fields = (
+ self.additional_bq_parameters.get("clustering", {}).get("fields",
[]))
+
output = (
input_beam_rows
| SchemaAwareExternalTransform(
@@ -2713,6 +2725,7 @@ class StorageWriteToBigQuery(PTransform):
use_at_least_once_semantics=self._use_at_least_once,
use_cdc_writes=self._use_cdc_writes,
primary_key=self._primary_key,
+ clustering_fields=clustering_fields,
error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))