tianz101 commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2700014795
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,14 +303,24 @@ public ProcessContinuation run(
throw e;
}
- LOG.debug("[{}] change stream completed successfully", token);
- if (tracker.tryClaim(endTimestamp)) {
+ LOG.debug(
+ "[{}] change stream completed successfully up to {}", token,
changeStreamQueryEndTimestamp);
+ if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
Review Comment:
Sam, "An unbounded end timestamp is no longer allowed for the change stream
query rpc. However if we reach the end timestamp we should be careful to only
advance the tracker to this timestamp and not the possibly unbounded end
timestamp of the range." Just a caution to confirm with you:
1) The dataflow pipeline can still be configured with bounded endTs and
unbounded endTs
2) But no matter how the pipeline endTs is configured, here dataflow always
break down to use endTs=now()+2m to query spanner change stream.
3) We should only claim now()+2m if the query is successful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]