pabloem commented on a change in pull request #16436:
URL: https://github.com/apache/beam/pull/16436#discussion_r798046441



##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
##########
@@ -74,6 +93,72 @@ public void testE2EV1Write() throws Exception {
     assertEquals(numEntities, numEntitiesWritten);
   }
 
+  /**
+   * Tests {@link DatastoreV1.DatastoreWriterFn} with duplicated entries. Once 
a duplicated entry is
+   * found the batch gets flushed.
+   */
+  @Test
+  public void testDatastoreWriterFnWithDuplicatedEntities() throws Exception {
+
+    List<Mutation> mutations = new ArrayList<>(200);
+    V1TestOptions options = 
TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+    Pipeline pipeline = TestPipeline.create(options);
+
+    for (int i = 1; i <= 200; i++) {
+      Key key = makeKey("key" + i, i + 1).build();
+
+      
mutations.add(makeUpsert(Entity.newBuilder().setKey(key).build()).build());
+
+      if (i % 30 == 0) {
+        
mutations.add(makeUpsert(Entity.newBuilder().setKey(key).build()).build());
+      }
+    }
+
+    DatastoreV1.DatastoreWriterFn datastoreWriter =
+        new DatastoreV1.DatastoreWriterFn(
+            
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(), null);
+
+    PTransform<PCollection<? extends Mutation>, PCollection<Void>> 
datastoreWriterTransform =
+        ParDo.of(datastoreWriter);
+
+    /** Following three lines turn the original arrayList into a member of the 
first PCollection */
+    List<Mutation> newArrayList = new ArrayList<>(mutations);
+    Create.Values<Iterable<Mutation>> mutationIterable =
+        Create.of(Collections.singleton(newArrayList));
+    PCollection<Iterable<Mutation>> input = pipeline.apply(mutationIterable);
+
+    /**
+     * Flatten divides the PCollection into several elements of the same 
bundle. By doing this we're
+     * forcing the processing of the List of mutation in the same order the 
mutations were added to
+     * the original List.
+     */
+    input.apply(Flatten.<Mutation>iterables()).apply(datastoreWriterTransform);
+
+    PipelineResult pResult = pipeline.run();
+
+    MetricQueryResults metricResults =
+        pResult
+            .metrics()
+            .queryMetrics(
+                MetricsFilter.builder()
+                    .addNameFilter(
+                        
MetricNameFilter.named(DatastoreV1.DatastoreWriterFn.class, "batchSize"))
+                    .build());
+
+    AtomicLong timesCommitted = new AtomicLong();
+
+    metricResults
+        .getDistributions()
+        .forEach(
+            distribution -> {
+              if (distribution.getName().getName().equals("batchSize")) {
+                timesCommitted.set(distribution.getCommitted().getCount());
+              }
+            });
+
+    assertEquals(6, timesCommitted.get());

Review comment:
       you may need to update this to 7, maybe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to