This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac25d46ae2 Adding clustering support for python storage write api 
(#35526)
5ac25d46ae2 is described below

commit 5ac25d46ae2894c5fa8803803572f47d2920af7d
Author: Tanu Sharma <[email protected]>
AuthorDate: Mon Jul 21 19:17:00 2025 +0530

    Adding clustering support for python storage write api (#35526)
    
    * Adding clustering support for python storage write api
    
    * Added test for managed io and some corrections
    
    * Python formatting
    
    * Correction
    
    * Addressed comments
    
    * Corrected test as per recent changes
---
 .../beam/sdk/io/gcp/bigquery/TableDestination.java |  2 +-
 ...ueryStorageWriteApiSchemaTransformProvider.java |  1 -
 .../providers/BigQueryWriteConfiguration.java      |  5 ++++
 .../providers/PortableBigQueryDestinations.java    |  8 ++++++
 .../gcp/bigquery/providers/BigQueryManagedIT.java  | 20 +++++++++++----
 .../io/external/xlang_bigqueryio_it_test.py        | 30 ++++++++++++++++++++++
 sdks/python/apache_beam/io/gcp/bigquery.py         | 13 ++++++++++
 7 files changed, 72 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index df96e1bc226..6b81c2322a5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -64,7 +64,7 @@ public class TableDestination implements Serializable {
   public TableDestination(
       String tableSpec,
       @Nullable String tableDescription,
-      TimePartitioning timePartitioning,
+      @Nullable TimePartitioning timePartitioning,
       Clustering clustering) {
     this(
         tableSpec,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 1e53ad3553e..bb8f7200342 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -307,7 +307,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
             
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
         write = write.withCreateDisposition(createDisposition);
       }
-
       if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
         WriteDisposition writeDisposition =
             
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
index 505ce7125ce..5df6e1f6afc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
@@ -194,6 +194,9 @@ public abstract class BigQueryWriteConfiguration {
           + "Is mutually exclusive with 'keep' and 'drop'.")
   public abstract @Nullable String getOnly();
 
+  @SchemaFieldDescription("A list of columns to cluster the BigQuery table 
by.")
+  public abstract @Nullable List<String> getClusteringFields();
+
   /** Builder for {@link BigQueryWriteConfiguration}. */
   @AutoValue.Builder
   public abstract static class Builder {
@@ -226,6 +229,8 @@ public abstract class BigQueryWriteConfiguration {
 
     public abstract Builder setOnly(String only);
 
+    public abstract Builder setClusteringFields(List<String> clusteringFields);
+
     /** Builds a {@link BigQueryWriteConfiguration} instance. */
     public abstract BigQueryWriteConfiguration build();
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
index 0cd2b65b085..42eee4f3f03 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfigu
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
+import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.TableConstraints;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -48,8 +49,10 @@ public class PortableBigQueryDestinations extends 
DynamicDestinations<Row, Strin
   private @MonotonicNonNull RowStringInterpolator interpolator = null;
   private final @Nullable List<String> primaryKey;
   private final RowFilter rowFilter;
+  private final @Nullable List<String> clusteringFields;
 
   public PortableBigQueryDestinations(Schema rowSchema, 
BigQueryWriteConfiguration configuration) {
+    this.clusteringFields = configuration.getClusteringFields();
     // DYNAMIC_DESTINATIONS magic string is the old way of doing it for 
cross-language.
     // In that case, we do no interpolation
     if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
@@ -79,6 +82,11 @@ public class PortableBigQueryDestinations extends 
DynamicDestinations<Row, Strin
 
   @Override
   public TableDestination getTable(String destination) {
+
+    if (clusteringFields != null && !clusteringFields.isEmpty()) {
+      Clustering clustering = new Clustering().setFields(clusteringFields);
+      return new TableDestination(destination, null, null, clustering);
+    }
     return new TableDestination(destination, null);
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
index 6a422f1832d..4c164e6a38d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 
+import com.google.api.services.bigquery.model.Clustering;
+import com.google.api.services.bigquery.model.Table;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,6 +52,7 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,6 +85,8 @@ public class BigQueryManagedIT {
       TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
   private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + 
System.nanoTime();
 
+  private static final Clustering CLUSTERING = new 
Clustering().setFields(Arrays.asList("str"));
+
   @BeforeClass
   public static void setUpTestEnvironment() throws IOException, 
InterruptedException {
     // Create one BQ dataset for all test cases.
@@ -94,10 +99,11 @@ public class BigQueryManagedIT {
   }
 
   @Test
-  public void testBatchFileLoadsWriteRead() {
+  public void testBatchFileLoadsWriteRead() throws IOException, 
InterruptedException {
     String table =
         String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, 
testName.getMethodName());
-    Map<String, Object> writeConfig = ImmutableMap.of("table", table);
+    Map<String, Object> writeConfig =
+        ImmutableMap.of("table", table, "clustering_fields", 
Collections.singletonList("str"));
 
     // file loads requires a GCS temp location
     String tempLocation = 
writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
@@ -117,6 +123,11 @@ public class BigQueryManagedIT {
             .getSinglePCollection();
     PAssert.that(outputRows).containsInAnyOrder(ROWS);
     readPipeline.run().waitUntilFinish();
+
+    // Asserting clustering
+    Table tableMetadata =
+        BQ_CLIENT.getTableResource(PROJECT, BIG_QUERY_DATASET_ID, 
testName.getMethodName());
+    Assert.assertEquals(CLUSTERING, tableMetadata.getClustering());
   }
 
   @Test
@@ -148,7 +159,7 @@ public class BigQueryManagedIT {
 
   public void testDynamicDestinations(boolean streaming) throws IOException, 
InterruptedException {
     String baseTableName =
-        String.format("%s:%s.dynamic_" + System.nanoTime(), PROJECT, 
BIG_QUERY_DATASET_ID);
+        String.format("%s.%s.dynamic_" + System.nanoTime(), PROJECT, 
BIG_QUERY_DATASET_ID);
     String destinationTemplate = baseTableName + "_{dest}";
     Map<String, Object> config =
         ImmutableMap.of("table", destinationTemplate, "drop", 
Collections.singletonList("dest"));
@@ -173,8 +184,7 @@ public class BigQueryManagedIT {
       long mod = i;
       String dest = destinations.get(i);
       List<Row> writtenRows =
-          BQ_CLIENT
-              .queryUnflattened(String.format("SELECT * FROM [%s]", dest), 
PROJECT, true, false)
+          BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM `%s`", 
dest), PROJECT, true, true)
               .stream()
               .map(tableRow -> 
BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tableRow))
               .collect(Collectors.toList());
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index 7f3a16e02aa..38d9174cef2 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -245,6 +245,36 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
           | StorageWriteToBigQuery(table=table_id))
     hamcrest_assert(p, bq_matcher)
 
+  def test_write_with_clustering(self):
+    table = 'write_with_clustering'
+    table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
+
+    bq_matcher = BigqueryFullResultMatcher(
+        project=self.project,
+        query="SELECT * FROM {}.{}".format(self.dataset_id, table),
+        data=self.parse_expected_data(self.ELEMENTS))
+
+    with beam.Pipeline(argv=self.args) as p:
+      _ = (
+          p
+          | "Create test data" >> beam.Create(self.ELEMENTS)
+          | beam.io.WriteToBigQuery(
+              table=table_id,
+              method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
+              schema=self.ALL_TYPES_SCHEMA,
+              create_disposition='CREATE_IF_NEEDED',
+              write_disposition='WRITE_TRUNCATE',
+              additional_bq_parameters={'clustering': {
+                  'fields': ['int']
+              }}))
+
+    # After pipeline finishes, verify clustering is applied
+    table = self.bigquery_client.get_table(self.project, self.dataset_id, 
table)
+    clustering_fields = table.clustering.fields if table.clustering else []
+
+    self.assertEqual(clustering_fields, ['int'])
+    hamcrest_assert(p, bq_matcher)
+
   def test_write_with_beam_rows_cdc(self):
     table = 'write_with_beam_rows_cdc'
     table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 870a77b59e6..baabedb20e4 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2364,6 +2364,7 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` 
that has a JSON string,
           table_side_inputs=self.table_side_inputs,
           create_disposition=self.create_disposition,
           write_disposition=self.write_disposition,
+          additional_bq_parameters=self.additional_bq_parameters,
           triggering_frequency=self.triggering_frequency,
           use_at_least_once=self.use_at_least_once,
           with_auto_sharding=self.with_auto_sharding,
@@ -2611,6 +2612,7 @@ class StorageWriteToBigQuery(PTransform):
       schema=None,
       create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
       write_disposition=BigQueryDisposition.WRITE_APPEND,
+      additional_bq_parameters=None,
       triggering_frequency=0,
       use_at_least_once=False,
       with_auto_sharding=False,
@@ -2623,6 +2625,7 @@ class StorageWriteToBigQuery(PTransform):
     self._schema = schema
     self._create_disposition = create_disposition
     self._write_disposition = write_disposition
+    self.additional_bq_parameters = additional_bq_parameters
     self._triggering_frequency = triggering_frequency
     self._use_at_least_once = use_at_least_once
     self._with_auto_sharding = with_auto_sharding
@@ -2698,6 +2701,15 @@ class StorageWriteToBigQuery(PTransform):
       # communicate to Java that this write should use dynamic destinations
       table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS
 
+    clustering_fields = []
+    if self.additional_bq_parameters:
+      if callable(self.additional_bq_parameters):
+        raise NotImplementedError(
+            "Currently, dynamic clustering and timepartitioning is not "
+            "supported for STORAGE_WRITE_API write method.")
+      clustering_fields = (
+          self.additional_bq_parameters.get("clustering", {}).get("fields", 
[]))
+
     output = (
         input_beam_rows
         | SchemaAwareExternalTransform(
@@ -2713,6 +2725,7 @@ class StorageWriteToBigQuery(PTransform):
             use_at_least_once_semantics=self._use_at_least_once,
             use_cdc_writes=self._use_cdc_writes,
             primary_key=self._primary_key,
+            clustering_fields=clustering_fields,
             error_handling={
                 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
             }))

Reply via email to