tianz101 commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2700014795


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,14 +303,24 @@ public ProcessContinuation run(
       throw e;
     }
 
-    LOG.debug("[{}] change stream completed successfully", token);
-    if (tracker.tryClaim(endTimestamp)) {
+    LOG.debug(
+        "[{}] change stream completed successfully up to {}", token, 
changeStreamQueryEndTimestamp);
+    if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {

Review Comment:
   Sam, "An unbounded end timestamp is no longer allowed for the change stream 
query rpc. However if we reach the end timestamp we should be careful to only 
advance the tracker to this timestamp and not the possibly unbounded end 
timestamp of the range."  Just a caution to confirm with you:
   1) The dataflow pipeline can still be configured with bounded endTs and 
unbounded endTs
   2) But no matter how the pipeline endTs is configured, here dataflow always 
break down to use endTs=now()+2m to query spanner change stream.
   3) We should only claim now()+2m if the query is successful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to