This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fef4cf8a028 Merge pull request #28124: Allow using CREATE_IF_NEEDED
when writing deletes or updates to BigQuery
fef4cf8a028 is described below
commit fef4cf8a0289e060ca05b6808beaf5734707d8a2
Author: Reuven Lax <[email protected]>
AuthorDate: Thu Aug 24 15:45:15 2023 -0700
Merge pull request #28124: Allow using CREATE_IF_NEEDED when writing
deletes or updates to BigQuery
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 39 ++++++++++++++++----
.../sdk/io/gcp/bigquery/CreateTableHelpers.java | 9 +++++
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 5 +++
.../sdk/io/gcp/bigquery/DynamicDestinations.java | 9 +++++
.../gcp/bigquery/DynamicDestinationsHelpers.java | 30 ++++++++++++++++
.../bigquery/StorageApiDynamicDestinations.java | 36 ++-----------------
.../bigquery/StorageApiWriteUnshardedRecords.java | 1 +
.../bigquery/StorageApiWritesShardedRecords.java | 4 ++-
.../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 42 +++++-----------------
9 files changed, 100 insertions(+), 75 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 96da67321cb..58d76931244 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -30,6 +30,7 @@ import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
@@ -510,7 +511,8 @@ import org.slf4j.LoggerFactory;
* .apply(BigQueryIO.applyRowMutations()
* .to(my_project:my_dataset.my_table)
* .withSchema(schema)
- * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ * .withPrimaryKey(ImmutableList.of("field1", "field2"))
+ * .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
* }</pre>
*
* <p>If writing a type other than TableRow (e.g. using {@link
BigQueryIO#writeGenericRecords} or
@@ -523,12 +525,17 @@ import org.slf4j.LoggerFactory;
* cdcEvent.apply(BigQueryIO.write()
* .to("my-project:my_dataset.my_table")
* .withSchema(schema)
+ * .withPrimaryKey(ImmutableList.of("field1", "field2"))
* .withFormatFunction(CdcEvent::getTableRow)
* .withRowMutationInformationFn(cdc ->
RowMutationInformation.of(cdc.getChangeType(),
*
cdc.getSequenceNumber()))
* .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
- * .withCreateDisposition(Write.CreateDisposition.CREATE_NEVER));
+ * .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED));
* }</pre>
+ *
+ * <p>Note that in order to use inserts or deletes, the table must bet set up
with a primary key. If
+ * the table is not previously created and CREATE_IF_NEEDED is used, a primary
key must be
+ * specified.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20506)
@@ -2318,6 +2325,8 @@ public class BigQueryIO {
abstract @Nullable String getKmsKey();
+ abstract @Nullable List<String> getPrimaryKey();
+
abstract Boolean getOptimizeWrites();
abstract Boolean getUseBeamSchema();
@@ -2416,7 +2425,9 @@ public class BigQueryIO {
abstract Builder<T> setIgnoreInsertIds(Boolean ignoreInsertIds);
- abstract Builder<T> setKmsKey(String kmsKey);
+ abstract Builder<T> setKmsKey(@Nullable String kmsKey);
+
+ abstract Builder<T> setPrimaryKey(@Nullable List<String> primaryKey);
abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);
@@ -2947,6 +2958,10 @@ public class BigQueryIO {
return toBuilder().setKmsKey(kmsKey).build();
}
+ public Write<T> withPrimaryKey(List<String> primaryKey) {
+ return toBuilder().setPrimaryKey(primaryKey).build();
+ }
+
/**
* If true, enables new codepaths that are expected to use less resources
while writing to
* BigQuery. Not enabled by default in order to maintain backwards
compatibility.
@@ -3241,6 +3256,7 @@ public class BigQueryIO {
LOG.warn("Setting the number of Storage API streams" + error);
}
}
+
if (method == Method.STORAGE_API_AT_LEAST_ONCE &&
getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn(
"Setting a number of Storage API streams is only supported when
using STORAGE_WRITE_API");
@@ -3254,9 +3270,12 @@ public class BigQueryIO {
if (getRowMutationInformationFn() != null) {
checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE);
checkArgument(
- getCreateDisposition() == CreateDisposition.CREATE_NEVER,
- "CREATE_IF_NEEDED is not supported when applying row updates.
Tables must be precreated "
- + "with a primary key specified.");
+ getCreateDisposition() == CreateDisposition.CREATE_NEVER ||
getPrimaryKey() != null,
+ "If specifying CREATE_IF_NEEDED along with row updates, a primary
key needs to be specified");
+ }
+ if (getPrimaryKey() != null) {
+ checkArgument(
+ getMethod() != Method.FILE_LOADS, "Primary key not supported when
using FILE_LOADS");
}
if (getAutoSchemaUpdate()) {
@@ -3311,6 +3330,14 @@ public class BigQueryIO {
getJsonTimePartitioning(),
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
}
+ if (getPrimaryKey() != null) {
+ dynamicDestinations =
+ new
DynamicDestinationsHelpers.ConstantTableConstraintsDestinations<>(
+ (DynamicDestinations<T, TableDestination>)
dynamicDestinations,
+ new TableConstraints()
+ .setPrimaryKey(
+ new
TableConstraints.PrimaryKey().setColumns(getPrimaryKey())));
+ }
}
return expandTyped(input, dynamicDestinations);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index e7d4c32993b..6edd3f71cc7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -25,6 +25,7 @@ import com.google.api.gax.rpc.ApiException;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.EncryptionConfiguration;
import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
@@ -86,6 +87,7 @@ public class CreateTableHelpers {
BigQueryOptions bigQueryOptions,
TableDestination tableDestination,
Supplier<@Nullable TableSchema> schemaSupplier,
+ Supplier<@Nullable TableConstraints> tableConstraintsSupplier,
CreateDisposition createDisposition,
@Nullable Coder<?> tableDestinationCoder,
@Nullable String kmsKey,
@@ -125,6 +127,7 @@ public class CreateTableHelpers {
tryCreateTable(
bigQueryOptions,
schemaSupplier,
+ tableConstraintsSupplier,
tableDestination,
createDisposition,
tableSpec,
@@ -139,6 +142,7 @@ public class CreateTableHelpers {
private static void tryCreateTable(
BigQueryOptions options,
Supplier<@Nullable TableSchema> schemaSupplier,
+ Supplier<@Nullable TableConstraints> tableConstraintsSupplier,
TableDestination tableDestination,
CreateDisposition createDisposition,
String tableSpec,
@@ -151,6 +155,7 @@ public class CreateTableHelpers {
tableReference, Collections.emptyList(),
DatasetService.TableMetadataView.BASIC)
== null) {
TableSchema tableSchema = schemaSupplier.get();
+ @Nullable TableConstraints tableConstraints =
tableConstraintsSupplier.get();
Preconditions.checkArgumentNotNull(
tableSchema,
"Unless create disposition is %s, a schema must be specified, i.e.
"
@@ -162,6 +167,10 @@ public class CreateTableHelpers {
tableDestination);
Table table = new
Table().setTableReference(tableReference).setSchema(tableSchema);
+ if (tableConstraints != null) {
+ table = table.setTableConstraints(tableConstraints);
+ }
+
String tableDescription = tableDestination.getTableDescription();
if (tableDescription != null) {
table = table.setDescription(tableDescription);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 1856b5ab63f..7e5299b7e67 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.List;
import java.util.Map;
@@ -113,10 +114,14 @@ public class CreateTables<DestinationT, ElementT>
dest);
Supplier<@Nullable TableSchema> schemaSupplier =
() -> dynamicDestinations.getSchema(dest);
+ Supplier<@Nullable TableConstraints> tableConstraintsSupplier =
+ () -> dynamicDestinations.getTableConstraints(dest);
+
return CreateTableHelpers.possiblyCreateTable(
context.getPipelineOptions().as(BigQueryOptions.class),
tableDestination1,
schemaSupplier,
+ tableConstraintsSupplier,
createDisposition,
dynamicDestinations.getDestinationCoder(),
kmsKey,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index b2041f69bda..e5cf82d7c2e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParameters;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.Serializable;
import java.util.List;
@@ -154,6 +155,14 @@ public abstract class DynamicDestinations<T, DestinationT>
implements Serializab
/** Returns the table schema for the destination. */
public abstract @Nullable TableSchema getSchema(DestinationT destination);
+ /**
+ * Returns TableConstraints (including primary and foreign key) to be used
when creating the
+ * table. Note: this is not currently supported when using FILE_LOADS!.
+ */
+ public @Nullable TableConstraints getTableConstraints(DestinationT
destination) {
+ return null;
+ }
+
// Gets the destination coder. If the user does not provide one, try to find
one in the coder
// registry. If no coder can be found, throws CannotProvideCoderException.
Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 30492647457..62355fd9417 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -23,6 +23,7 @@ import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
@@ -177,6 +178,11 @@ class DynamicDestinationsHelpers {
return inner.getSchema(destination);
}
+ @Override
+ public @Nullable TableConstraints getTableConstraints(DestinationT
destination) {
+ return inner.getTableConstraints(destination);
+ }
+
@Override
public TableDestination getTable(DestinationT destination) {
return inner.getTable(destination);
@@ -214,6 +220,30 @@ class DynamicDestinationsHelpers {
}
}
+ static class ConstantTableConstraintsDestinations<T, DestinationT>
+ extends DelegatingDynamicDestinations<T, DestinationT> {
+ private final String jsonTableConstraints;
+
+ ConstantTableConstraintsDestinations(
+ DynamicDestinations<T, DestinationT> inner, TableConstraints
tableConstraints) {
+ super(inner);
+ this.jsonTableConstraints =
BigQueryHelpers.toJsonString(tableConstraints);
+ }
+
+ @Override
+ public TableConstraints getTableConstraints(DestinationT destination) {
+ return BigQueryHelpers.fromJsonString(jsonTableConstraints,
TableConstraints.class);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("inner", inner)
+ .add("tableConstraints", jsonTableConstraints)
+ .toString();
+ }
+ }
+
/** Returns the same schema for every table. */
static class ConstantSchemaDestinations<T, DestinationT>
extends DelegatingDynamicDestinations<T, DestinationT> {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index fdf330d378f..8ec4d52e3b9 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -18,19 +18,14 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
import com.google.protobuf.DescriptorProtos;
-import java.util.List;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
/** Base dynamicDestinations class used by the Storage API sink. */
abstract class StorageApiDynamicDestinations<T, DestinationT>
- extends DynamicDestinations<T, DestinationT> {
+ extends DynamicDestinationsHelpers.DelegatingDynamicDestinations<T,
DestinationT> {
public interface MessageConverter<T> {
com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
@@ -42,40 +37,13 @@ abstract class StorageApiDynamicDestinations<T,
DestinationT>
TableRow toTableRow(T element);
}
- private DynamicDestinations<T, DestinationT> inner;
-
StorageApiDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
- this.inner = inner;
+ super(inner);
}
public abstract MessageConverter<T> getMessageConverter(
DestinationT destination, DatasetService datasetService) throws
Exception;
- @Override
- public DestinationT getDestination(@Nullable ValueInSingleWindow<T> element)
{
- return inner.getDestination(element);
- }
-
- @Override
- public @Nullable Coder<DestinationT> getDestinationCoder() {
- return inner.getDestinationCoder();
- }
-
- @Override
- public TableDestination getTable(DestinationT destination) {
- return inner.getTable(destination);
- }
-
- @Override
- public @Nullable TableSchema getSchema(DestinationT destination) {
- return inner.getSchema(destination);
- }
-
- @Override
- public List<PCollectionView<?>> getSideInputs() {
- return inner.getSideInputs();
- }
-
@Override
void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext
context) {
super.setSideInputAccessorFromProcessContext(context);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 27a5b30c156..3ac5140f73f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -939,6 +939,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
c.getPipelineOptions().as(BigQueryOptions.class),
tableDestination1,
() -> dynamicDestinations.getSchema(destination),
+ () -> dynamicDestinations.getTableConstraints(destination),
createDisposition,
destinationCoder,
kmsKey,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index cc7f221e32e..cf7de067e15 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -441,10 +441,12 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
Coder<DestinationT> destinationCoder =
dynamicDestinations.getDestinationCoder();
Callable<Boolean> tryCreateTable =
() -> {
+ DestinationT dest = element.getKey().getKey();
CreateTableHelpers.possiblyCreateTable(
c.getPipelineOptions().as(BigQueryOptions.class),
tableDestination,
- () -> dynamicDestinations.getSchema(element.getKey().getKey()),
+ () -> dynamicDestinations.getSchema(dest),
+ () -> dynamicDestinations.getTableConstraints(dest),
createDisposition,
destinationCoder,
kmsKey,
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
index c790dffe7ed..d5366fe2961 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@@ -59,38 +60,8 @@ public class StorageApiSinkRowUpdateIT {
BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
}
- private static String createTable(TableSchema tableSchema, List<String>
primaryKey)
- throws IOException, InterruptedException {
- String table = "table" + System.nanoTime();
-
- BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, table);
-
- StringBuilder ddl =
- new StringBuilder("CREATE TABLE ")
- .append(PROJECT)
- .append(".")
- .append(BIG_QUERY_DATASET_ID)
- .append(".")
- .append(table)
- .append("(");
- for (TableFieldSchema tableFieldSchema : tableSchema.getFields()) {
- ddl.append(tableFieldSchema.getName())
- .append(" ")
- .append(tableFieldSchema.getType())
- .append(",");
- }
-
- String primaryKeyString = String.join(",", primaryKey);
- ddl.append(" PRIMARY KEY ")
- .append("(")
- .append(primaryKeyString)
- .append(")")
- .append(" NOT ENFORCED) ");
- ddl.append("CLUSTER BY ").append(primaryKeyString);
-
- BQ_CLIENT.queryWithRetriesUsingStandardSql(ddl.toString(), PROJECT);
-
- return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table;
+ private static String getTablespec() {
+ return PROJECT + "." + BIG_QUERY_DATASET_ID + "." + "table" +
System.nanoTime();
}
@Test
@@ -130,7 +101,8 @@ public class StorageApiSinkRowUpdateIT {
new TableRow().set("key1", "foo4").set("key2",
"bar4").set("value", "1"),
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1)));
- String tableSpec = createTable(tableSchema, Lists.newArrayList("key1",
"key2"));
+ List<String> primaryKey = Lists.newArrayList("key1", "key2");
+ String tableSpec = getTablespec();
Pipeline p = Pipeline.create();
p.apply("Create rows", Create.of(items))
.apply(
@@ -138,8 +110,10 @@ public class StorageApiSinkRowUpdateIT {
BigQueryIO.applyRowMutations()
.to(tableSpec)
.withSchema(tableSchema)
+ .withPrimaryKey(primaryKey)
+ .withClustering(new Clustering().setFields(primaryKey))
.withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();