[ 
https://issues.apache.org/jira/browse/BEAM-1330?focusedWorklogId=719723&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-719723
 ]

ASF GitHub Bot logged work on BEAM-1330:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Feb/22 21:57
            Start Date: 02/Feb/22 21:57
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #16436:
URL: https://github.com/apache/beam/pull/16436#discussion_r798046441



##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
##########
@@ -74,6 +93,72 @@ public void testE2EV1Write() throws Exception {
     assertEquals(numEntities, numEntitiesWritten);
   }
 
+  /**
+   * Tests {@link DatastoreV1.DatastoreWriterFn} with duplicated entries. Once 
a duplicated entry is
+   * found the batch gets flushed.
+   */
+  @Test
+  public void testDatastoreWriterFnWithDuplicatedEntities() throws Exception {
+
+    List<Mutation> mutations = new ArrayList<>(200);
+    V1TestOptions options = 
TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+    Pipeline pipeline = TestPipeline.create(options);
+
+    for (int i = 1; i <= 200; i++) {
+      Key key = makeKey("key" + i, i + 1).build();
+
+      
mutations.add(makeUpsert(Entity.newBuilder().setKey(key).build()).build());
+
+      if (i % 30 == 0) {
+        
mutations.add(makeUpsert(Entity.newBuilder().setKey(key).build()).build());
+      }
+    }
+
+    DatastoreV1.DatastoreWriterFn datastoreWriter =
+        new DatastoreV1.DatastoreWriterFn(
+            
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(), null);
+
+    PTransform<PCollection<? extends Mutation>, PCollection<Void>> 
datastoreWriterTransform =
+        ParDo.of(datastoreWriter);
+
+    /** Following three lines turn the original arrayList into a member of the 
first PCollection */
+    List<Mutation> newArrayList = new ArrayList<>(mutations);
+    Create.Values<Iterable<Mutation>> mutationIterable =
+        Create.of(Collections.singleton(newArrayList));
+    PCollection<Iterable<Mutation>> input = pipeline.apply(mutationIterable);
+
+    /**
+     * Flatten divides the PCollection into several elements of the same 
bundle. By doing this we're
+     * forcing the processing of the List of mutation in the same order the 
mutations were added to
+     * the original List.
+     */
+    input.apply(Flatten.<Mutation>iterables()).apply(datastoreWriterTransform);
+
+    PipelineResult pResult = pipeline.run();
+
+    MetricQueryResults metricResults =
+        pResult
+            .metrics()
+            .queryMetrics(
+                MetricsFilter.builder()
+                    .addNameFilter(
+                        
MetricNameFilter.named(DatastoreV1.DatastoreWriterFn.class, "batchSize"))
+                    .build());
+
+    AtomicLong timesCommitted = new AtomicLong();
+
+    metricResults
+        .getDistributions()
+        .forEach(
+            distribution -> {
+              if (distribution.getName().getName().equals("batchSize")) {
+                timesCommitted.set(distribution.getCommitted().getCount());
+              }
+            });
+
+    assertEquals(6, timesCommitted.get());

Review comment:
       you may need to update this to 7, maybe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 719723)
    Time Spent: 2h  (was: 1h 50m)

> DatastoreIO Writes should flush early when duplicate keys arrive.
> -----------------------------------------------------------------
>
>                 Key: BEAM-1330
>                 URL: https://issues.apache.org/jira/browse/BEAM-1330
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Vikas Kedigehalli
>            Assignee: Fernando Morales
>            Priority: P3
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> DatastoreIO writes batches upto 500 entities (rpc limit for Cloud Datastore), 
> before flushing them out. The writes are non-transactional and thus do not 
> support duplicate keys in the writes. This can be problem, especially when 
> using a non global windowing, where multiple windows for the same key end up 
> in the same batch, and prevents the writes from succeeding.
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to