This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ccaba3  [BEAM-13265] Add withDeterministicRecordIdFn which allows for 
BigQueryIO.Write without added checkpointing (#15998)
1ccaba3 is described below

commit 1ccaba3d06bf8a3f8f1c1471008aadeb8e2e6bd2
Author: slavachernyak <[email protected]>
AuthorDate: Wed Nov 17 14:07:09 2021 -0800

    [BEAM-13265] Add withDeterministicRecordIdFn which allows for 
BigQueryIO.Write without added checkpointing (#15998)
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 34 +++++++++--
 .../beam/sdk/io/gcp/bigquery/StreamingInserts.java | 34 ++++++++++-
 .../sdk/io/gcp/bigquery/StreamingWriteTables.java  | 65 +++++++++++++++++-----
 .../beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java | 23 +++++++-
 4 files changed, 134 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 7f52a13..1702e32 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1706,6 +1706,7 @@ public class BigQueryIO {
         .setOptimizeWrites(false)
         .setUseBeamSchema(false)
         .setAutoSharding(false)
+        .setDeterministicRecordIdFn(null)
         .build();
   }
 
@@ -1844,6 +1845,9 @@ public class BigQueryIO {
     @Experimental
     abstract Boolean getAutoSharding();
 
+    @Experimental
+    abstract @Nullable SerializableFunction<T, String> 
getDeterministicRecordIdFn();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -1928,6 +1932,10 @@ public class BigQueryIO {
       @Experimental
       abstract Builder<T> setAutoSharding(Boolean autoSharding);
 
+      @Experimental
+      abstract Builder<T> setDeterministicRecordIdFn(
+          SerializableFunction<T, String> toUniqueIdFunction);
+
       abstract Write<T> build();
     }
 
@@ -2427,6 +2435,22 @@ public class BigQueryIO {
       return toBuilder().setAutoSharding(true).build();
     }
 
+    /**
+     * Provides a function which can serve as a source of deterministic unique 
ids for each record
+     * to be written, replacing the unique ids generated with the default 
scheme. When used with
+     * {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from 
the BigQueryIO Write by
+     * using the keys on which the data is grouped at the point at which 
BigQueryIO Write is
+     * applied, since the reshuffle is necessary only for the checkpointing of 
the default-generated
+     * ids for determinism. This may be beneficial as a performance 
optimization in the case when
+     * the current sharding is already sufficient for writing to BigQuery. Thi 
behavior takes
+     * precedence over {@link #withAutoSharding}.
+     */
+    @Experimental
+    public Write<T> withDeterministicRecordIdFn(
+        SerializableFunction<T, String> toUniqueIdFunction) {
+      return 
toBuilder().setDeterministicRecordIdFn(toUniqueIdFunction).build();
+    }
+
     @VisibleForTesting
     /** This method is for test usage only */
     public Write<T> withTestServices(BigQueryServices testServices) {
@@ -2588,9 +2612,9 @@ public class BigQueryIO {
       } else {
         checkArgument(
             getTriggeringFrequency() == null && getNumFileShards() == 0,
-            "Triggering frequency or number of file shards can be specified 
only when writing "
-                + "an unbounded PCollection via FILE_LOADS or 
STORAGE_API_WRITES, but: the collection was %s "
-                + "and the method was %s",
+            "Triggering frequency or number of file shards can be specified 
only when writing an"
+                + " unbounded PCollection via FILE_LOADS or 
STORAGE_API_WRITES, but: the collection"
+                + " was %s and the method was %s",
             input.isBounded(),
             method);
       }
@@ -2719,7 +2743,8 @@ public class BigQueryIO {
         if (avroRowWriterFactory != null) {
           checkArgument(
               formatFunction == null,
-              "Only one of withFormatFunction or 
withAvroFormatFunction/withAvroWriter maybe set, not both.");
+              "Only one of withFormatFunction or 
withAvroFormatFunction/withAvroWriter maybe set,"
+                  + " not both.");
 
           SerializableFunction<TableSchema, org.apache.avro.Schema> 
avroSchemaFactory =
               getAvroSchemaFactory();
@@ -2813,6 +2838,7 @@ public class BigQueryIO {
                 .withIgnoreUnknownValues(getIgnoreUnknownValues())
                 .withIgnoreInsertIds(getIgnoreInsertIds())
                 .withAutoSharding(getAutoSharding())
+                .withDeterministicRecordIdFn(getDeterministicRecordIdFn())
                 .withKmsKey(getKmsKey());
         return input.apply(streamingInserts);
       } else if (method == Write.Method.FILE_LOADS) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index 1e4150a..79fbdac 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -47,6 +47,7 @@ public class StreamingInserts<DestinationT, ElementT>
   private final Coder<ElementT> elementCoder;
   private final SerializableFunction<ElementT, TableRow> toTableRow;
   private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
+  private final SerializableFunction<ElementT, String> deterministicRecordIdFn;
 
   /** Constructor. */
   public StreamingInserts(
@@ -68,6 +69,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        null,
         null);
   }
 
@@ -85,6 +87,7 @@ public class StreamingInserts<DestinationT, ElementT>
       Coder<ElementT> elementCoder,
       SerializableFunction<ElementT, TableRow> toTableRow,
       SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
+      SerializableFunction<ElementT, String> deterministicRecordIdFn,
       String kmsKey) {
     this.createDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
@@ -98,6 +101,7 @@ public class StreamingInserts<DestinationT, ElementT>
     this.elementCoder = elementCoder;
     this.toTableRow = toTableRow;
     this.toFailsafeTableRow = toFailsafeTableRow;
+    this.deterministicRecordIdFn = deterministicRecordIdFn;
     this.kmsKey = kmsKey;
   }
 
@@ -117,6 +121,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -135,6 +140,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -152,6 +158,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -169,6 +176,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -186,6 +194,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -203,6 +212,26 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
+        kmsKey);
+  }
+
+  StreamingInserts<DestinationT, ElementT> withDeterministicRecordIdFn(
+      SerializableFunction<ElementT, String> deterministicRecordIdFn) {
+    return new StreamingInserts<>(
+        createDisposition,
+        dynamicDestinations,
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues,
+        ignoreInsertIds,
+        autoSharding,
+        elementCoder,
+        toTableRow,
+        toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -220,6 +249,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -237,6 +267,7 @@ public class StreamingInserts<DestinationT, ElementT>
         elementCoder,
         toTableRow,
         toFailsafeTableRow,
+        deterministicRecordIdFn,
         kmsKey);
   }
 
@@ -260,6 +291,7 @@ public class StreamingInserts<DestinationT, ElementT>
             .withAutoSharding(autoSharding)
             .withElementCoder(elementCoder)
             .withToTableRow(toTableRow)
-            .withToFailsafeTableRow(toFailsafeTableRow));
+            .withToFailsafeTableRow(toFailsafeTableRow)
+            .withDeterministicRecordIdFn(deterministicRecordIdFn));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index 5afd1ae..915eb8a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -62,6 +62,7 @@ public class StreamingWriteTables<ElementT>
   private final Coder<ElementT> elementCoder;
   private final SerializableFunction<ElementT, TableRow> toTableRow;
   private final SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
+  private final SerializableFunction<ElementT, String> deterministicRecordIdFn;
 
   public StreamingWriteTables() {
     this(
@@ -74,7 +75,8 @@ public class StreamingWriteTables<ElementT>
         false, // autoSharding
         null, // elementCoder
         null, // toTableRow
-        null); // toFailsafeTableRow
+        null, // toFailsafeTableRow
+        null); // deterministicRecordIdFn
   }
 
   private StreamingWriteTables(
@@ -87,7 +89,8 @@ public class StreamingWriteTables<ElementT>
       boolean autoSharding,
       Coder<ElementT> elementCoder,
       SerializableFunction<ElementT, TableRow> toTableRow,
-      SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
+      SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
+      SerializableFunction<ElementT, String> deterministicRecordIdFn) {
     this.bigQueryServices = bigQueryServices;
     this.retryPolicy = retryPolicy;
     this.extendedErrorInfo = extendedErrorInfo;
@@ -98,6 +101,7 @@ public class StreamingWriteTables<ElementT>
     this.elementCoder = elementCoder;
     this.toTableRow = toTableRow;
     this.toFailsafeTableRow = toFailsafeTableRow;
+    this.deterministicRecordIdFn = deterministicRecordIdFn;
   }
 
   StreamingWriteTables<ElementT> withTestServices(BigQueryServices 
bigQueryServices) {
@@ -111,7 +115,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withInsertRetryPolicy(InsertRetryPolicy 
retryPolicy) {
@@ -125,7 +130,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withExtendedErrorInfo(boolean 
extendedErrorInfo) {
@@ -139,7 +145,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withSkipInvalidRows(boolean skipInvalidRows) {
@@ -153,7 +160,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withIgnoreUnknownValues(boolean 
ignoreUnknownValues) {
@@ -167,7 +175,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withIgnoreInsertIds(boolean ignoreInsertIds) {
@@ -181,7 +190,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withAutoSharding(boolean autoSharding) {
@@ -195,7 +205,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withElementCoder(Coder<ElementT> 
elementCoder) {
@@ -209,7 +220,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withToTableRow(
@@ -224,7 +236,8 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   StreamingWriteTables<ElementT> withToFailsafeTableRow(
@@ -239,7 +252,24 @@ public class StreamingWriteTables<ElementT>
         autoSharding,
         elementCoder,
         toTableRow,
-        toFailsafeTableRow);
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
+  }
+
+  StreamingWriteTables<ElementT> withDeterministicRecordIdFn(
+      SerializableFunction<ElementT, String> deterministicRecordIdFn) {
+    return new StreamingWriteTables<>(
+        bigQueryServices,
+        retryPolicy,
+        extendedErrorInfo,
+        skipInvalidRows,
+        ignoreUnknownValues,
+        ignoreInsertIds,
+        autoSharding,
+        elementCoder,
+        toTableRow,
+        toFailsafeTableRow,
+        deterministicRecordIdFn);
   }
 
   @Override
@@ -295,7 +325,7 @@ public class StreamingWriteTables<ElementT>
     // different unique ids, this implementation relies on "checkpointing", 
which is
     // achieved as a side effect of having BigQuery insertion immediately 
follow a GBK.
 
-    if (autoSharding) {
+    if (autoSharding && deterministicRecordIdFn == null) {
       // If runner determined dynamic sharding is enabled, group TableRows on 
table destinations
       // that may be sharded during the runtime. Otherwise, we choose a fixed 
number of shards per
       // table destination following the logic below in the other branch.
@@ -339,14 +369,19 @@ public class StreamingWriteTables<ElementT>
           input
               .apply("ShardTableWrites", ParDo.of(new 
GenerateShardedTable<>(numShards)))
               .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), 
elementCoder))
-              .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>()))
+              .apply("TagWithUniqueIds", ParDo.of(new 
TagWithUniqueIds<>(deterministicRecordIdFn)))
               .setCoder(
                   KvCoder.of(
                       ShardedKeyCoder.of(StringUtf8Coder.of()),
                       TableRowInfoCoder.of(elementCoder)));
 
+      if (deterministicRecordIdFn == null) {
+        // If not using a deterministic function for record ids, we must apply 
a reshuffle to ensure
+        // determinism on the generated ids.
+        shardedTagged = shardedTagged.apply(Reshuffle.of());
+      }
+
       return shardedTagged
-          .apply(Reshuffle.of())
           // Put in the global window to ensure that DynamicDestinations side 
inputs are
           // accessed
           // correctly.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index bd4e230..07576d9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import java.io.IOException;
 import java.util.UUID;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Fn that tags each table row with a unique id and destination table. To 
avoid calling
@@ -38,15 +40,32 @@ class TagWithUniqueIds<KeyT, ElementT>
   private transient String randomUUID;
   private transient long sequenceNo = 0L;
 
+  private final @Nullable SerializableFunction<ElementT, String> elementToId;
+
+  public TagWithUniqueIds() {
+    elementToId = null;
+  }
+
+  public TagWithUniqueIds(SerializableFunction<ElementT, String> elementToId) {
+    this.elementToId = elementToId;
+  }
+
   @StartBundle
   public void startBundle() {
-    randomUUID = UUID.randomUUID().toString();
+    if (elementToId == null) {
+      randomUUID = UUID.randomUUID().toString();
+    }
   }
 
   /** Tag the input with a unique id. */
   @ProcessElement
   public void processElement(ProcessContext context) throws IOException {
-    String uniqueId = randomUUID + sequenceNo++;
+    String uniqueId;
+    if (elementToId == null) {
+      uniqueId = randomUUID + sequenceNo++;
+    } else {
+      uniqueId = elementToId.apply(context.element().getValue());
+    }
     // We output on keys 0-50 to ensure that there's enough batching for
     // BigQuery.
     context.output(

Reply via email to