yixiaoshen commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r927906163


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java:
##########
@@ -477,10 +508,16 @@ private static List<Query> splitQuery(
         @Nullable String namespace,
         Datastore datastore,
         QuerySplitter querySplitter,
-        int numSplits)
+        int numSplits,
+        @Nullable Instant readTime)
         throws DatastoreException {
       // If namespace is set, include it in the split request so splits are 
calculated accordingly.
-      return querySplitter.getSplits(query, forNamespace(namespace).build(), 
numSplits, datastore);
+      PartitionId partitionId = forNamespace(namespace).build();
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        return querySplitter.getSplits(query, partitionId, numSplits, 
datastore, readTimeProto);
+      }
+      return querySplitter.getSplits(query, partitionId, numSplits, datastore);

Review Comment:
   sub-queries all maintain the same read time, in that way we can guarantee a 
consistent read across the entire database from the beam workload.
   
   Split function queries Datastore using the split points (a special index), 
having the split function use the same read time as all other queries will make 
sure the split points are accurate as of that read time, this can impact how 
well and balanced the beam workload can be partitioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to