This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 04eff7d  Integrate BigQuery sink file loads with GroupIntoBatches
     new dc29b8c  Merge pull request #13859 from [BEAM-11772] Integrate 
BigQuery sink file loads with GroupIntoBatches
04eff7d is described below

commit 04eff7db83c64609ee2f27abb0003231c53b9c0b
Author: sychen <[email protected]>
AuthorDate: Sun Jan 31 11:49:07 2021 -0800

    Integrate BigQuery sink file loads with GroupIntoBatches
---
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java       | 130 ++++++++++++++++-----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  17 ++-
 .../gcp/bigquery/WriteGroupedRecordsToFiles.java   |  21 ++--
 .../sdk/io/gcp/testing/FakeDatasetService.java     |  10 ++
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  69 +++++++++++
 5 files changed, 200 insertions(+), 47 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index f804131..1828192 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -47,9 +47,12 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -265,31 +268,44 @@ class BatchLoads<DestinationT, ElementT>
 
   // Expand the pipeline when the user has requested periodically-triggered 
file writes.
   private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> 
input) {
-    checkArgument(numFileShards > 0);
     Pipeline p = input.getPipeline();
     final PCollectionView<String> loadJobIdPrefixView = 
createJobIdPrefixView(p, JobType.LOAD);
     final PCollectionView<String> copyJobIdPrefixView = 
createJobIdPrefixView(p, JobType.COPY);
     final PCollectionView<String> tempFilePrefixView =
         createTempFilePrefixView(p, loadJobIdPrefixView);
-    // The user-supplied triggeringDuration is often chosen to control how 
many BigQuery load
-    // jobs are generated, to prevent going over BigQuery's daily quota for 
load jobs. If this
-    // is set to a large value, currently we have to buffer all the data until 
the trigger fires.
-    // Instead we ensure that the files are written if a threshold number of 
records are ready.
-    // We use only the user-supplied trigger on the actual BigQuery load. This 
allows us to
-    // offload the data to the filesystem.
-    PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
-        input.apply(
-            "rewindowIntoGlobal",
-            Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
-                .triggering(
-                    Repeatedly.forever(
-                        AfterFirst.of(
-                            AfterProcessingTime.pastFirstElementInPane()
-                                .plusDelayOf(triggeringFrequency),
-                            
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
-                .discardingFiredPanes());
-    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
-        writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+    PCollection<WriteBundlesToFiles.Result<DestinationT>> results;
+    if (numFileShards > 0) {
+      // The user-supplied triggeringFrequency is often chosen to control how 
many BigQuery load
+      // jobs are generated, to prevent going over BigQuery's daily quota for 
load jobs. If this
+      // is set to a large value, currently we have to buffer all the data 
until the trigger fires.
+      // Instead we ensure that the files are written if a threshold number of 
records are ready.
+      // We use only the user-supplied trigger on the actual BigQuery load. 
This allows us to
+      // offload the data to the filesystem.
+      PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
+          input.apply(
+              "rewindowIntoGlobal",
+              Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
+                  .triggering(
+                      Repeatedly.forever(
+                          AfterFirst.of(
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(triggeringFrequency),
+                              
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
+                  .discardingFiredPanes());
+      results = writeStaticallyShardedFiles(inputInGlobalWindow, 
tempFilePrefixView);
+    } else {
+      // In the case of dynamic sharding, however, we use a default triggering 
and instead apply the
+      // user supplied triggeringFrequency to the sharding transform. See
+      // writeDynamicallyShardedFilesTriggered.
+      PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
+          input.apply(
+              "rewindowIntoGlobal",
+              Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+      results = writeDynamicallyShardedFilesTriggered(inputInGlobalWindow, 
tempFilePrefixView);
+    }
+
     // Apply the user's trigger before we start generating BigQuery load jobs.
     results =
         results.apply(
@@ -307,7 +323,7 @@ class BatchLoads<DestinationT, ElementT>
         new TupleTag<>("singlePartitionTag");
 
     // If we have non-default triggered output, we can't use the side-input 
technique used in
-    // expandUntriggered . Instead make the result list a main input. Apply a 
GroupByKey first for
+    // expandUntriggered. Instead make the result list a main input. Apply a 
GroupByKey first for
     // determinism.
     PCollectionTuple partitions =
         results
@@ -371,8 +387,8 @@ class BatchLoads<DestinationT, ElementT>
                 .discardingFiredPanes());
     PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
         (numFileShards == 0)
-            ? writeDynamicallyShardedFiles(inputInGlobalWindow, 
tempFilePrefixView)
-            : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+            ? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, 
tempFilePrefixView)
+            : writeStaticallyShardedFiles(inputInGlobalWindow, 
tempFilePrefixView);
 
     TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
         new TupleTag<KV<ShardedKey<DestinationT>, 
List<String>>>("multiPartitionsTag") {};
@@ -470,9 +486,10 @@ class BatchLoads<DestinationT, ElementT>
         .apply("TempFilePrefixView", View.asSingleton());
   }
 
-  // Writes input data to dynamically-sharded, per-bundle files. Returns a 
PCollection of filename,
-  // file byte size, and table destination.
-  PCollection<WriteBundlesToFiles.Result<DestinationT>> 
writeDynamicallyShardedFiles(
+  // Writes input data to dynamically-sharded per-bundle files without 
triggering. Input records are
+  // spilt to new files if memory is constrained. Returns a PCollection of 
filename, file byte size,
+  // and table destination.
+  PCollection<WriteBundlesToFiles.Result<DestinationT>> 
writeDynamicallyShardedFilesUntriggered(
       PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> 
tempFilePrefix) {
     TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
         new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") 
{};
@@ -513,9 +530,9 @@ class BatchLoads<DestinationT, ElementT>
         .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
   }
 
-  // Writes input data to statically-sharded files. Returns a PCollection of 
filename,
-  // file byte size, and table destination.
-  PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
+  // Writes input data to statically-sharded files. Returns a PCollection of 
filename, file byte
+  // size, and table destination.
+  PCollection<WriteBundlesToFiles.Result<DestinationT>> 
writeStaticallyShardedFiles(
       PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> 
tempFilePrefix) {
     checkState(numFileShards > 0);
     PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords =
@@ -547,12 +564,67 @@ class BatchLoads<DestinationT, ElementT>
     return writeShardedRecords(shardedRecords, tempFilePrefix);
   }
 
+  // Writes input data to dynamically-sharded files with triggering. The input 
data is sharded by
+  // table destinations and each destination may be sub-sharded dynamically. 
Returns a PCollection
+  // of filename, file byte size, and table destination.
+  PCollection<WriteBundlesToFiles.Result<DestinationT>> 
writeDynamicallyShardedFilesTriggered(
+      PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> 
tempFilePrefix) {
+    // In contrast to fixed sharding with triggering, here we use a global 
window with default
+    // trigger and apply the user supplied triggeringFrequency in the 
subsequent GroupIntoBatches
+    // transform. We also ensure that the files are written if a threshold 
number of records are
+    // ready. Dynamic sharding is achieved via the withShardedKey() option 
provided by
+    // GroupIntoBatches.
+    return input
+        .apply(
+            GroupIntoBatches.<DestinationT, 
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+                .withMaxBufferingDuration(triggeringFrequency)
+                .withShardedKey())
+        .setCoder(
+            KvCoder.of(
+                org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+                IterableCoder.of(elementCoder)))
+        .apply(
+            "StripShardId",
+            MapElements.via(
+                new SimpleFunction<
+                    KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, 
Iterable<ElementT>>,
+                    KV<DestinationT, Iterable<ElementT>>>() {
+                  @Override
+                  public KV<DestinationT, Iterable<ElementT>> apply(
+                      KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, 
Iterable<ElementT>>
+                          input) {
+                    return KV.of(input.getKey().getKey(), input.getValue());
+                  }
+                }))
+        .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
+        .apply(
+            "WriteGroupedRecords",
+            ParDo.of(
+                    new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
+                        tempFilePrefix, maxFileSize, rowWriterFactory))
+                .withSideInputs(tempFilePrefix))
+        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+  }
+
   private PCollection<Result<DestinationT>> writeShardedRecords(
       PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
       PCollectionView<String> tempFilePrefix) {
     return shardedRecords
         .apply("GroupByDestination", GroupByKey.create())
         .apply(
+            "StripShardId",
+            MapElements.via(
+                new SimpleFunction<
+                    KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
+                    KV<DestinationT, Iterable<ElementT>>>() {
+                  @Override
+                  public KV<DestinationT, Iterable<ElementT>> apply(
+                      KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
+                    return KV.of(input.getKey().getKey(), input.getValue());
+                  }
+                }))
+        .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
+        .apply(
             "WriteGroupedRecords",
             ParDo.of(
                     new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1869c91..1e789c1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2267,7 +2267,8 @@ public class BigQueryIO {
 
     /**
      * Control how many file shards are written when using BigQuery load jobs. 
Applicable only when
-     * also setting {@link #withTriggeringFrequency}.
+     * also setting {@link #withTriggeringFrequency}. To let runner determine 
the sharding at
+     * runtime, set {@link #withAutoSharding()} instead.
      */
     public Write<T> withNumFileShards(int numFileShards) {
       checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: 
%s", numFileShards);
@@ -2350,10 +2351,10 @@ public class BigQueryIO {
     }
 
     /**
-     * If true, enables dynamically determined number of shards to write to 
BigQuery. Only
-     * applicable to unbounded data with STREAMING_INSERTS.
-     *
-     * <p>TODO(BEAM-11408): Also integrate this option to FILE_LOADS.
+     * If true, enables using a dynamically determined number of shards to 
write to BigQuery. This
+     * can be used for both {@link Method#FILE_LOADS} and {@link 
Method#STREAMING_INSERTS}. Only
+     * applicable to unbounded data. If using {@link Method#FILE_LOADS}, 
numFileShards set via
+     * {@link #withNumFileShards} will be ignored.
      */
     @Experimental
     public Write<T> withAutoSharding() {
@@ -2751,7 +2752,11 @@ public class BigQueryIO {
           batchLoads.setMaxRetryJobs(1000);
         }
         batchLoads.setTriggeringFrequency(getTriggeringFrequency());
-        batchLoads.setNumFileShards(getNumFileShards());
+        if (getAutoSharding()) {
+          batchLoads.setNumFileShards(0);
+        } else {
+          batchLoads.setNumFileShards(getNumFileShards());
+        }
         return input.apply(batchLoads);
       }
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
index 6db179b..8c6366d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
@@ -20,17 +20,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
 
 /**
- * Receives elements grouped by their (sharded) destination, and writes them 
out to a file. Since
- * all the elements in the {@link Iterable} are destined to the same table, 
they are all written to
- * the same file. Ensures that only one {@link TableRowWriter} is active per 
bundle.
+ * Receives elements grouped by their destination, and writes them out to a 
file. Since all the
+ * elements in the {@link Iterable} are destined to the same table, they are 
all written to the same
+ * file. Ensures that only one {@link TableRowWriter} is active per bundle.
  */
 class WriteGroupedRecordsToFiles<DestinationT, ElementT>
-    extends DoFn<
-        KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
-        WriteBundlesToFiles.Result<DestinationT>> {
+    extends DoFn<KV<DestinationT, Iterable<ElementT>>, 
WriteBundlesToFiles.Result<DestinationT>> {
 
   private final PCollectionView<String> tempFilePrefix;
   private final long maxFileSize;
@@ -48,24 +45,24 @@ class WriteGroupedRecordsToFiles<DestinationT, ElementT>
   @ProcessElement
   public void processElement(
       ProcessContext c,
-      @Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
+      @Element KV<DestinationT, Iterable<ElementT>> element,
       OutputReceiver<WriteBundlesToFiles.Result<DestinationT>> o)
       throws Exception {
 
     String tempFilePrefix = c.sideInput(this.tempFilePrefix);
 
     BigQueryRowWriter<ElementT> writer =
-        rowWriterFactory.createRowWriter(tempFilePrefix, 
element.getKey().getKey());
+        rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey());
 
     try {
       for (ElementT tableRow : element.getValue()) {
         if (writer.getByteSize() > maxFileSize) {
           writer.close();
-          writer = rowWriterFactory.createRowWriter(tempFilePrefix, 
element.getKey().getKey());
+          writer = rowWriterFactory.createRowWriter(tempFilePrefix, 
element.getKey());
           BigQueryRowWriter.Result result = writer.getResult();
           o.output(
               new WriteBundlesToFiles.Result<>(
-                  result.resourceId.toString(), result.byteSize, 
c.element().getKey().getKey()));
+                  result.resourceId.toString(), result.byteSize, 
c.element().getKey()));
         }
         writer.write(tableRow);
       }
@@ -76,6 +73,6 @@ class WriteGroupedRecordsToFiles<DestinationT, ElementT>
     BigQueryRowWriter.Result result = writer.getResult();
     o.output(
         new WriteBundlesToFiles.Result<>(
-            result.resourceId.toString(), result.byteSize, 
c.element().getKey().getKey()));
+            result.resourceId.toString(), result.byteSize, 
c.element().getKey()));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 7534a86..bfd6418 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -32,6 +32,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -62,8 +63,12 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
 
   Map<String, List<String>> insertErrors = Maps.newHashMap();
 
+  // The counter for the number of insertions performed.
+  static AtomicInteger insertCount;
+
   public static void setUp() {
     tables = HashBasedTable.create();
+    insertCount = new AtomicInteger(0);
     FakeJobService.setUp();
   }
 
@@ -217,6 +222,10 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
     }
   }
 
+  public int getInsertCount() {
+    return insertCount.get();
+  }
+
   public long insertAll(
       TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
       throws IOException, InterruptedException {
@@ -292,6 +301,7 @@ public class FakeDatasetService implements DatasetService, 
Serializable {
               failedInserts, allErrors.get(allErrors.size() - 1), ref, 
rowList.get(i));
         }
       }
+      insertCount.addAndGet(1);
       return dataSize;
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index d327ece..8d2f07a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -57,6 +57,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -605,6 +606,74 @@ public class BigQueryIOWriteTest implements Serializable {
   }
 
   @Test
+  public void testTriggeredFileLoadsWithAutoSharding() throws Exception {
+    List<TableRow> elements = Lists.newArrayList();
+    for (int i = 0; i < 30; ++i) {
+      elements.add(new TableRow().set("number", i));
+    }
+
+    Instant startInstant = new Instant(0L);
+    TestStream<TableRow> testStream =
+        TestStream.create(TableRowJsonCoder.of())
+            // Initialize watermark for timer to be triggered correctly.
+            .advanceWatermarkTo(startInstant)
+            .addElements(
+                elements.get(0), Iterables.toArray(elements.subList(1, 10), 
TableRow.class))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            
.advanceWatermarkTo(startInstant.plus(Duration.standardSeconds(10)))
+            .addElements(
+                elements.get(10), Iterables.toArray(elements.subList(11, 20), 
TableRow.class))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            
.advanceWatermarkTo(startInstant.plus(Duration.standardSeconds(30)))
+            .addElements(
+                elements.get(20), Iterables.toArray(elements.subList(21, 30), 
TableRow.class))
+            .advanceProcessingTime(Duration.standardMinutes(2))
+            .advanceWatermarkToInfinity();
+
+    int numTables = 3;
+    p.apply(testStream)
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to(
+                    (ValueInSingleWindow<TableRow> vsw) -> {
+                      String tableSpec =
+                          "project-id:dataset-id.table-"
+                              + ((int) vsw.getValue().get("number") % 
numTables);
+                      return new TableDestination(tableSpec, null);
+                    })
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new 
TableFieldSchema().setName("number").setType("INTEGER"))))
+                .withTestServices(fakeBqServices)
+                // Set a triggering frequency without needing to also specify 
numFileShards when
+                // using autoSharding.
+                .withTriggeringFrequency(Duration.standardSeconds(100))
+                .withAutoSharding()
+                .withMaxBytesPerPartition(1000)
+                .withMaxFilesPerPartition(10)
+                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+                .withoutValidation());
+    p.run();
+
+    Map<Integer, List<TableRow>> elementsByTableIdx = new HashMap<>();
+    for (int i = 0; i < elements.size(); i++) {
+      elementsByTableIdx
+          .computeIfAbsent(i % numTables, k -> new ArrayList<>())
+          .add(elements.get(i));
+    }
+    for (Map.Entry<Integer, List<TableRow>> entry : 
elementsByTableIdx.entrySet()) {
+      assertThat(
+          fakeDatasetService.getAllRows("project-id", "dataset-id", "table-" + 
entry.getKey()),
+          containsInAnyOrder(Iterables.toArray(entry.getValue(), 
TableRow.class)));
+    }
+    // For each table destination, it's expected to create two load jobs based 
on the triggering
+    // frequency and processing time intervals.
+    assertEquals(2 * numTables, fakeDatasetService.getInsertCount());
+  }
+
+  @Test
   public void testFailuresNoRetryPolicy() throws Exception {
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
     TableRow row2 = new TableRow().set("name", "b").set("number", "2");

Reply via email to