This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1ccaba3 [BEAM-13265] Add withDeterministicRecordIdFn which allows for
BigQueryIO.Write without added checkpointing (#15998)
1ccaba3 is described below
commit 1ccaba3d06bf8a3f8f1c1471008aadeb8e2e6bd2
Author: slavachernyak <[email protected]>
AuthorDate: Wed Nov 17 14:07:09 2021 -0800
[BEAM-13265] Add withDeterministicRecordIdFn which allows for
BigQueryIO.Write without added checkpointing (#15998)
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 34 +++++++++--
.../beam/sdk/io/gcp/bigquery/StreamingInserts.java | 34 ++++++++++-
.../sdk/io/gcp/bigquery/StreamingWriteTables.java | 65 +++++++++++++++++-----
.../beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java | 23 +++++++-
4 files changed, 134 insertions(+), 22 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7f52a13..1702e32 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1706,6 +1706,7 @@ public class BigQueryIO {
.setOptimizeWrites(false)
.setUseBeamSchema(false)
.setAutoSharding(false)
+ .setDeterministicRecordIdFn(null)
.build();
}
@@ -1844,6 +1845,9 @@ public class BigQueryIO {
@Experimental
abstract Boolean getAutoSharding();
+ @Experimental
+ abstract @Nullable SerializableFunction<T, String>
getDeterministicRecordIdFn();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -1928,6 +1932,10 @@ public class BigQueryIO {
@Experimental
abstract Builder<T> setAutoSharding(Boolean autoSharding);
+ @Experimental
+ abstract Builder<T> setDeterministicRecordIdFn(
+ SerializableFunction<T, String> toUniqueIdFunction);
+
abstract Write<T> build();
}
@@ -2427,6 +2435,22 @@ public class BigQueryIO {
return toBuilder().setAutoSharding(true).build();
}
+ /**
+ * Provides a function which can serve as a source of deterministic unique
ids for each record
+ * to be written, replacing the unique ids generated with the default
scheme. When used with
+ * {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from
the BigQueryIO Write by
+ * using the keys on which the data is grouped at the point at which
BigQueryIO Write is
+ * applied, since the reshuffle is necessary only for the checkpointing of
the default-generated
+ * ids for determinism. This may be beneficial as a performance
optimization in the case when
+ * the current sharding is already sufficient for writing to BigQuery. Thi
behavior takes
+ * precedence over {@link #withAutoSharding}.
+ */
+ @Experimental
+ public Write<T> withDeterministicRecordIdFn(
+ SerializableFunction<T, String> toUniqueIdFunction) {
+ return
toBuilder().setDeterministicRecordIdFn(toUniqueIdFunction).build();
+ }
+
@VisibleForTesting
/** This method is for test usage only */
public Write<T> withTestServices(BigQueryServices testServices) {
@@ -2588,9 +2612,9 @@ public class BigQueryIO {
} else {
checkArgument(
getTriggeringFrequency() == null && getNumFileShards() == 0,
- "Triggering frequency or number of file shards can be specified
only when writing "
- + "an unbounded PCollection via FILE_LOADS or
STORAGE_API_WRITES, but: the collection was %s "
- + "and the method was %s",
+ "Triggering frequency or number of file shards can be specified
only when writing an"
+ + " unbounded PCollection via FILE_LOADS or
STORAGE_API_WRITES, but: the collection"
+ + " was %s and the method was %s",
input.isBounded(),
method);
}
@@ -2719,7 +2743,8 @@ public class BigQueryIO {
if (avroRowWriterFactory != null) {
checkArgument(
formatFunction == null,
- "Only one of withFormatFunction or
withAvroFormatFunction/withAvroWriter maybe set, not both.");
+ "Only one of withFormatFunction or
withAvroFormatFunction/withAvroWriter maybe set,"
+ + " not both.");
SerializableFunction<TableSchema, org.apache.avro.Schema>
avroSchemaFactory =
getAvroSchemaFactory();
@@ -2813,6 +2838,7 @@ public class BigQueryIO {
.withIgnoreUnknownValues(getIgnoreUnknownValues())
.withIgnoreInsertIds(getIgnoreInsertIds())
.withAutoSharding(getAutoSharding())
+ .withDeterministicRecordIdFn(getDeterministicRecordIdFn())
.withKmsKey(getKmsKey());
return input.apply(streamingInserts);
} else if (method == Write.Method.FILE_LOADS) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index 1e4150a..79fbdac 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -47,6 +47,7 @@ public class StreamingInserts<DestinationT, ElementT>
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toTableRow;
private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
+ private final SerializableFunction<ElementT, String> deterministicRecordIdFn;
/** Constructor. */
public StreamingInserts(
@@ -68,6 +69,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ null,
null);
}
@@ -85,6 +87,7 @@ public class StreamingInserts<DestinationT, ElementT>
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toTableRow,
SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
+ SerializableFunction<ElementT, String> deterministicRecordIdFn,
String kmsKey) {
this.createDisposition = createDisposition;
this.dynamicDestinations = dynamicDestinations;
@@ -98,6 +101,7 @@ public class StreamingInserts<DestinationT, ElementT>
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
this.toFailsafeTableRow = toFailsafeTableRow;
+ this.deterministicRecordIdFn = deterministicRecordIdFn;
this.kmsKey = kmsKey;
}
@@ -117,6 +121,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -135,6 +140,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -152,6 +158,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -169,6 +176,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -186,6 +194,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -203,6 +212,26 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
+ kmsKey);
+ }
+
+ StreamingInserts<DestinationT, ElementT> withDeterministicRecordIdFn(
+ SerializableFunction<ElementT, String> deterministicRecordIdFn) {
+ return new StreamingInserts<>(
+ createDisposition,
+ dynamicDestinations,
+ bigQueryServices,
+ retryPolicy,
+ extendedErrorInfo,
+ skipInvalidRows,
+ ignoreUnknownValues,
+ ignoreInsertIds,
+ autoSharding,
+ elementCoder,
+ toTableRow,
+ toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -220,6 +249,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -237,6 +267,7 @@ public class StreamingInserts<DestinationT, ElementT>
elementCoder,
toTableRow,
toFailsafeTableRow,
+ deterministicRecordIdFn,
kmsKey);
}
@@ -260,6 +291,7 @@ public class StreamingInserts<DestinationT, ElementT>
.withAutoSharding(autoSharding)
.withElementCoder(elementCoder)
.withToTableRow(toTableRow)
- .withToFailsafeTableRow(toFailsafeTableRow));
+ .withToFailsafeTableRow(toFailsafeTableRow)
+ .withDeterministicRecordIdFn(deterministicRecordIdFn));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 5afd1ae..915eb8a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -62,6 +62,7 @@ public class StreamingWriteTables<ElementT>
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toTableRow;
private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
+ private final SerializableFunction<ElementT, String> deterministicRecordIdFn;
public StreamingWriteTables() {
this(
@@ -74,7 +75,8 @@ public class StreamingWriteTables<ElementT>
false, // autoSharding
null, // elementCoder
null, // toTableRow
- null); // toFailsafeTableRow
+ null, // toFailsafeTableRow
+ null); // deterministicRecordIdFn
}
private StreamingWriteTables(
@@ -87,7 +89,8 @@ public class StreamingWriteTables<ElementT>
boolean autoSharding,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toTableRow,
- SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
+ SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
+ SerializableFunction<ElementT, String> deterministicRecordIdFn) {
this.bigQueryServices = bigQueryServices;
this.retryPolicy = retryPolicy;
this.extendedErrorInfo = extendedErrorInfo;
@@ -98,6 +101,7 @@ public class StreamingWriteTables<ElementT>
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
this.toFailsafeTableRow = toFailsafeTableRow;
+ this.deterministicRecordIdFn = deterministicRecordIdFn;
}
StreamingWriteTables<ElementT> withTestServices(BigQueryServices
bigQueryServices) {
@@ -111,7 +115,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withInsertRetryPolicy(InsertRetryPolicy
retryPolicy) {
@@ -125,7 +130,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withExtendedErrorInfo(boolean
extendedErrorInfo) {
@@ -139,7 +145,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withSkipInvalidRows(boolean skipInvalidRows) {
@@ -153,7 +160,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withIgnoreUnknownValues(boolean
ignoreUnknownValues) {
@@ -167,7 +175,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withIgnoreInsertIds(boolean ignoreInsertIds) {
@@ -181,7 +190,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withAutoSharding(boolean autoSharding) {
@@ -195,7 +205,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withElementCoder(Coder<ElementT>
elementCoder) {
@@ -209,7 +220,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withToTableRow(
@@ -224,7 +236,8 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withToFailsafeTableRow(
@@ -239,7 +252,24 @@ public class StreamingWriteTables<ElementT>
autoSharding,
elementCoder,
toTableRow,
- toFailsafeTableRow);
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
+ }
+
+ StreamingWriteTables<ElementT> withDeterministicRecordIdFn(
+ SerializableFunction<ElementT, String> deterministicRecordIdFn) {
+ return new StreamingWriteTables<>(
+ bigQueryServices,
+ retryPolicy,
+ extendedErrorInfo,
+ skipInvalidRows,
+ ignoreUnknownValues,
+ ignoreInsertIds,
+ autoSharding,
+ elementCoder,
+ toTableRow,
+ toFailsafeTableRow,
+ deterministicRecordIdFn);
}
@Override
@@ -295,7 +325,7 @@ public class StreamingWriteTables<ElementT>
// different unique ids, this implementation relies on "checkpointing",
which is
// achieved as a side effect of having BigQuery insertion immediately
follow a GBK.
- if (autoSharding) {
+ if (autoSharding && deterministicRecordIdFn == null) {
// If runner determined dynamic sharding is enabled, group TableRows on
table destinations
// that may be sharded during the runtime. Otherwise, we choose a fixed
number of shards per
// table destination following the logic below in the other branch.
@@ -339,14 +369,19 @@ public class StreamingWriteTables<ElementT>
input
.apply("ShardTableWrites", ParDo.of(new
GenerateShardedTable<>(numShards)))
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()),
elementCoder))
- .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>()))
+ .apply("TagWithUniqueIds", ParDo.of(new
TagWithUniqueIds<>(deterministicRecordIdFn)))
.setCoder(
KvCoder.of(
ShardedKeyCoder.of(StringUtf8Coder.of()),
TableRowInfoCoder.of(elementCoder)));
+ if (deterministicRecordIdFn == null) {
+ // If not using a deterministic function for record ids, we must apply
a reshuffle to ensure
+ // determinism on the generated ids.
+ shardedTagged = shardedTagged.apply(Reshuffle.of());
+ }
+
return shardedTagged
- .apply(Reshuffle.of())
// Put in the global window to ensure that DynamicDestinations side
inputs are
// accessed
// correctly.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index bd4e230..07576d9 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import java.io.IOException;
import java.util.UUID;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Fn that tags each table row with a unique id and destination table. To
avoid calling
@@ -38,15 +40,32 @@ class TagWithUniqueIds<KeyT, ElementT>
private transient String randomUUID;
private transient long sequenceNo = 0L;
+ private final @Nullable SerializableFunction<ElementT, String> elementToId;
+
+ public TagWithUniqueIds() {
+ elementToId = null;
+ }
+
+ public TagWithUniqueIds(SerializableFunction<ElementT, String> elementToId) {
+ this.elementToId = elementToId;
+ }
+
@StartBundle
public void startBundle() {
- randomUUID = UUID.randomUUID().toString();
+ if (elementToId == null) {
+ randomUUID = UUID.randomUUID().toString();
+ }
}
/** Tag the input with a unique id. */
@ProcessElement
public void processElement(ProcessContext context) throws IOException {
- String uniqueId = randomUUID + sequenceNo++;
+ String uniqueId;
+ if (elementToId == null) {
+ uniqueId = randomUUID + sequenceNo++;
+ } else {
+ uniqueId = elementToId.apply(context.element().getValue());
+ }
// We output on keys 0-50 to ensure that there's enough batching for
// BigQuery.
context.output(