scwhittle commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2703809975


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,14 +303,24 @@ public ProcessContinuation run(
       throw e;
     }
 
-    LOG.debug("[{}] change stream completed successfully", token);
-    if (tracker.tryClaim(endTimestamp)) {
+    LOG.debug(
+        "[{}] change stream completed successfully up to {}", token, 
changeStreamQueryEndTimestamp);
+    if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {

Review Comment:
   I think that if a bounded timestamp was configured it would be reflected in 
the range and thus the code above:
   ```
       final Timestamp endTimestamp = partition.getEndTimestamp();
       final Timestamp changeStreamQueryEndTimestamp =
           endTimestamp.equals(MAX_INCLUSIVE_END_AT)
               ? getNextReadChangeStreamEndTimestamp()
               : endTimestamp;
   ```
   would only be using now+2m if we were unbounded.
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,14 +303,24 @@ public ProcessContinuation run(
       throw e;
     }
 
-    LOG.debug("[{}] change stream completed successfully", token);
-    if (tracker.tryClaim(endTimestamp)) {
+    LOG.debug(
+        "[{}] change stream completed successfully up to {}", token, 
changeStreamQueryEndTimestamp);
+    if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
+      return ProcessContinuation.stop();
+    }
+
+    if (changeStreamQueryEndTimestamp.equals(endTimestamp)) {

Review Comment:
   I don't think >= or = matters because either they are equal (endTimestamp 
was not max) or they are not (endTimestamp is max, changeStreamEndTimesatmp is 
now + 2m).
   
   I was thinking that we only got down here if the query itself stops 
returning elements and that was due to reaching changeStreamQueryEndTimestamp. 
From "This seems not correct if we have partition terminate case", it sounds 
like we may also reach this point if partition ends and the query stops 
returning elements for that reason. I was thinking that this was handled by the 
PartitionEndRecordAction returning Processing.stop() but I see that isn't the 
case currently.
   
   Are we guaranteed to receive a PartitionEndRecord record as the last record 
on the partition before it stops? If so perhaps we can modify that action to 
stop processing.  Otherwise I'm not sure how to distinguish here that the query 
stopped early due to 
   1. hitting the artificial end timestamp 
   2. having no more records
   
   I think we would also have to handle isTimestampOutOfRange more explicitly 
too.
   
   I will add a commit implementing these to show what I mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to