This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.56.0 by this push:
new 3c7a01360df Cherry picking (#30460) BQ clustering valueprovider
(#31039)
3c7a01360df is described below
commit 3c7a01360df3635ae56b7073dfa36e75e7492f61
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Apr 18 11:47:41 2024 -0400
Cherry picking (#30460) BQ clustering valueprovider (#31039)
* Support clustering with value provider
* remove
* add some documentation
* fix
* address comments
* update test
* spotless
* use serializable json clustering; fix translation
* fall back on super's clustering and time partitioning when needed
* fork based on version 2.56.0
* fix test
---
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 22 +++++++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 31 ++++++++++++++--------
.../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 18 ++++++++-----
.../gcp/bigquery/DynamicDestinationsHelpers.java | 8 ++++++
.../sdk/io/gcp/bigquery/BigQueryClusteringIT.java | 5 +++-
.../sdk/io/gcp/bigquery/BigQueryHelpersTest.java | 11 ++++++++
.../io/gcp/bigquery/BigQueryIOTranslationTest.java | 10 +++++--
.../BigQueryTimePartitioningClusteringIT.java | 1 +
8 files changed, 85 insertions(+), 21 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index c4ad09ce6ea..8c600cf780a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -17,11 +17,13 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference;
@@ -31,6 +33,8 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
@@ -40,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
@@ -704,6 +709,23 @@ public class BigQueryHelpers {
}
}
+ static Clustering clusteringFromJsonFields(String jsonStringClustering) {
+ JsonElement jsonClustering = JsonParser.parseString(jsonStringClustering);
+
+ checkArgument(
+ jsonClustering.isJsonArray(),
+ "Received an invalid Clustering json string: %s."
+ + "Please provide a serialized json array like so: [\"column1\",
\"column2\"]",
+ jsonStringClustering);
+
+ List<String> fields =
+ jsonClustering.getAsJsonArray().asList().stream()
+ .map(JsonElement::getAsString)
+ .collect(Collectors.toList());
+
+ return new Clustering().setFields(fields);
+ }
+
static String resolveTempLocation(
String tempLocationDir, String bigQueryOperationName, String stepUuid) {
return FileSystems.matchNewResource(tempLocationDir, true)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a646e1e6247..fce8f1c5d40 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2350,7 +2350,7 @@ public class BigQueryIO {
abstract @Nullable ValueProvider<String> getJsonTimePartitioning();
- abstract @Nullable Clustering getClustering();
+ abstract @Nullable ValueProvider<String> getJsonClustering();
abstract CreateDisposition getCreateDisposition();
@@ -2459,7 +2459,7 @@ public class BigQueryIO {
abstract Builder<T> setJsonTimePartitioning(ValueProvider<String>
jsonTimePartitioning);
- abstract Builder<T> setClustering(Clustering clustering);
+ abstract Builder<T> setJsonClustering(ValueProvider<String> clustering);
abstract Builder<T> setCreateDisposition(CreateDisposition
createDisposition);
@@ -2826,8 +2826,18 @@ public class BigQueryIO {
* tables, the fields here will be ignored; call {@link #withClustering()}
instead.
*/
public Write<T> withClustering(Clustering clustering) {
- checkArgument(clustering != null, "clustering can not be null");
- return toBuilder().setClustering(clustering).build();
+ checkArgument(clustering != null, "clustering cannot be null");
+ return
withJsonClustering(StaticValueProvider.of(BigQueryHelpers.toJsonString(clustering)));
+ }
+
+ /**
+ * The same as {@link #withClustering(Clustering)}, but takes a
JSON-serialized Clustering
+ * object in a deferred {@link ValueProvider}. For example: `"{"fields":
["column1", "column2",
+ * "column3"]}"`
+ */
+ public Write<T> withJsonClustering(ValueProvider<String> jsonClustering) {
+ checkArgument(jsonClustering != null, "clustering cannot be null");
+ return toBuilder().setJsonClustering(jsonClustering).build();
}
/**
@@ -2844,7 +2854,7 @@ public class BigQueryIO {
* read state written with a previous version.
*/
public Write<T> withClustering() {
- return toBuilder().setClustering(new Clustering()).build();
+ return withClustering(new Clustering());
}
/** Specifies whether the table should be created if it does not exist. */
@@ -3420,10 +3430,10 @@ public class BigQueryIO {
if (getJsonTableRef() != null) {
dynamicDestinations =
DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
- getJsonTableRef(), getTableDescription(), getClustering() !=
null);
+ getJsonTableRef(), getTableDescription(),
getJsonClustering() != null);
} else if (getTableFunction() != null) {
dynamicDestinations =
- new TableFunctionDestinations<>(getTableFunction(),
getClustering() != null);
+ new TableFunctionDestinations<>(getTableFunction(),
getJsonClustering() != null);
}
// Wrap with a DynamicDestinations class that will provide a schema.
There might be no
@@ -3440,13 +3450,12 @@ public class BigQueryIO {
}
// Wrap with a DynamicDestinations class that will provide the proper
TimePartitioning.
- if (getJsonTimePartitioning() != null
- ||
Optional.ofNullable(getClustering()).map(Clustering::getFields).isPresent()) {
+ if (getJsonTimePartitioning() != null || (getJsonClustering() !=
null)) {
dynamicDestinations =
new ConstantTimePartitioningClusteringDestinations<>(
(DynamicDestinations<T, TableDestination>)
dynamicDestinations,
getJsonTimePartitioning(),
-
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
+ getJsonClustering());
}
if (getPrimaryKey() != null) {
dynamicDestinations =
@@ -3699,7 +3708,7 @@ public class BigQueryIO {
elementCoder,
rowWriterFactory,
getKmsKey(),
- getClustering() != null,
+ getJsonClustering() != null,
getUseAvroLogicalTypes(),
getWriteTempDataset(),
getBadRecordRouter(),
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index 9b499ea3253..fee79f5896c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static
org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
import static
org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;
-import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.service.AutoService;
import
com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
@@ -373,7 +372,7 @@ public class BigQueryIOTranslation {
.addNullableByteArrayField("dynamic_destinations")
.addNullableStringField("json_schema")
.addNullableStringField("json_time_partitioning")
- .addNullableByteArrayField("clustering")
+ .addNullableStringField("clustering")
.addNullableByteArrayField("create_disposition")
.addNullableByteArrayField("write_disposition")
.addNullableArrayField("schema_update_options", FieldType.BYTES)
@@ -474,8 +473,8 @@ public class BigQueryIOTranslation {
fieldValues.put(
"json_time_partitioning",
toByteArray(transform.getJsonTimePartitioning().get()));
}
- if (transform.getClustering() != null) {
- fieldValues.put("clustering", toByteArray(transform.getClustering()));
+ if (transform.getJsonClustering() != null) {
+ fieldValues.put("clustering", transform.getJsonClustering().get());
}
if (transform.getCreateDisposition() != null) {
fieldValues.put("create_disposition",
toByteArray(transform.getCreateDisposition()));
@@ -658,9 +657,14 @@ public class BigQueryIOTranslation {
if (jsonTimePartitioning != null) {
builder =
builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
}
- byte[] clusteringBytes = configRow.getBytes("clustering");
- if (clusteringBytes != null) {
- builder = builder.setClustering((Clustering)
fromByteArray(clusteringBytes));
+ // Translation with Clustering is broken before 2.56.0, where we used
to attempt to
+ // serialize a non-serializable Clustering object to bytes.
+ // In 2.56.0 onwards, we translate using the json string
representation instead.
+ if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion,
"2.56.0") >= 0) {
+ String jsonClustering = configRow.getString("clustering");
+ if (jsonClustering != null) {
+ builder =
builder.setJsonClustering(StaticValueProvider.of(jsonClustering));
+ }
}
byte[] createDispositionBytes =
configRow.getBytes("create_disposition");
if (createDispositionBytes != null) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 1f042a81eb9..eed4314e391 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -303,7 +304,14 @@ class DynamicDestinationsHelpers {
TableDestination destination = super.getDestination(element);
String partitioning =
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
+ if (partitioning == null
+ || JsonParser.parseString(partitioning).getAsJsonObject().isEmpty())
{
+ partitioning = destination.getJsonTimePartitioning();
+ }
String clustering =
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);
+ if (clustering == null ||
JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) {
+ clustering = destination.getJsonClustering();
+ }
return new TableDestination(
destination.getTableSpec(), destination.getTableDescription(),
partitioning, clustering);
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
index 3ee6931dd98..46989021ee9 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryClusteringIT.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -171,7 +172,9 @@ public class BigQueryClusteringIT {
.apply(
BigQueryIO.writeTableRows()
.to(new ClusteredDestinations(tableName))
- .withClustering(CLUSTERING)
+ .withJsonClustering(
+ ValueProvider.StaticValueProvider.of(
+ BigQueryHelpers.toJsonString(CLUSTERING.getFields())))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS));
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
index f5f4ddb9547..6779920702d 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
@@ -20,12 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.junit.Assert.assertEquals;
import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
+import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@@ -258,4 +260,13 @@ public class BigQueryHelpersTest {
assertEquals(dataset.get(),
noDataset.setDatasetId(dataset.get()).getDatasetId());
}
+
+ @Test
+ public void testClusteringJsonConversion() {
+ Clustering clustering =
+ new Clustering().setFields(Arrays.asList("column1", "column2",
"column3"));
+ String jsonClusteringFields = "[\"column1\", \"column2\", \"column3\"]";
+
+ assertEquals(clustering,
BigQueryHelpers.clusteringFromJsonFields(jsonClusteringFields));
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index 7f2ff894548..ce4c80adb95 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.HashMap;
@@ -89,7 +90,7 @@ public class BigQueryIOTranslationTest {
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getDynamicDestinations",
"dynamic_destinations");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonSchema", "json_schema");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonTimePartitioning",
"json_time_partitioning");
- WRITE_TRANSFORM_SCHEMA_MAPPING.put("getClustering", "clustering");
+ WRITE_TRANSFORM_SCHEMA_MAPPING.put("getJsonClustering", "clustering");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getCreateDisposition",
"create_disposition");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getWriteDisposition",
"write_disposition");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSchemaUpdateOptions",
"schema_update_options");
@@ -237,6 +238,7 @@ public class BigQueryIOTranslationTest {
@Test
public void testReCreateWriteTransformFromRowTable() {
// setting a subset of fields here.
+ Clustering testClustering = new Clustering().setFields(Arrays.asList("a",
"b", "c"));
BigQueryIO.Write<?> writeTransform =
BigQueryIO.write()
.to("dummyproject:dummydataset.dummytable")
@@ -244,6 +246,7 @@ public class BigQueryIOTranslationTest {
.withTriggeringFrequency(org.joda.time.Duration.millis(10000))
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
+ .withClustering(testClustering)
.withKmsKey("dummykmskey");
BigQueryIOTranslation.BigQueryIOWriteTranslator translator =
@@ -251,7 +254,7 @@ public class BigQueryIOTranslationTest {
Row row = translator.toConfigRow(writeTransform);
PipelineOptions options = PipelineOptionsFactory.create();
- options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0");
+ options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.56.0");
BigQueryIO.Write<?> writeTransformFromRow =
(BigQueryIO.Write<?>) translator.fromConfigRow(row, options);
assertNotNull(writeTransformFromRow.getTable());
@@ -261,6 +264,9 @@ public class BigQueryIOTranslationTest {
assertEquals(WriteDisposition.WRITE_TRUNCATE,
writeTransformFromRow.getWriteDisposition());
assertEquals(CreateDisposition.CREATE_NEVER,
writeTransformFromRow.getCreateDisposition());
assertEquals("dummykmskey", writeTransformFromRow.getKmsKey());
+ assertEquals(
+ BigQueryHelpers.toJsonString(testClustering),
+ writeTransformFromRow.getJsonClustering().get());
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
index 22e4feb3c05..008cc02beee 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimePartitioningClusteringIT.java
@@ -225,6 +225,7 @@ public class BigQueryTimePartitioningClusteringIT {
Table table = bqClient.tables().get(options.getProject(), DATASET_NAME,
tableName).execute();
Assert.assertEquals(table.getClustering(), CLUSTERING);
+ Assert.assertEquals(table.getTimePartitioning(), TIME_PARTITIONING);
}
@Test