scwhittle commented on code in PR #37326:
URL: https://github.com/apache/beam/pull/37326#discussion_r2703809975
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,14 +303,24 @@ public ProcessContinuation run(
throw e;
}
- LOG.debug("[{}] change stream completed successfully", token);
- if (tracker.tryClaim(endTimestamp)) {
+ LOG.debug(
+ "[{}] change stream completed successfully up to {}", token,
changeStreamQueryEndTimestamp);
+ if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
Review Comment:
I think that if a bounded timestamp was configured it would be reflected in
the range and thus the code above:
```
final Timestamp endTimestamp = partition.getEndTimestamp();
final Timestamp changeStreamQueryEndTimestamp =
endTimestamp.equals(MAX_INCLUSIVE_END_AT)
? getNextReadChangeStreamEndTimestamp()
: endTimestamp;
```
would only be using now+2m if we were unbounded.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java:
##########
@@ -303,14 +303,24 @@ public ProcessContinuation run(
throw e;
}
- LOG.debug("[{}] change stream completed successfully", token);
- if (tracker.tryClaim(endTimestamp)) {
+ LOG.debug(
+ "[{}] change stream completed successfully up to {}", token,
changeStreamQueryEndTimestamp);
+ if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
+ return ProcessContinuation.stop();
+ }
+
+ if (changeStreamQueryEndTimestamp.equals(endTimestamp)) {
Review Comment:
I don't think >= or = matters because either they are equal (endTimestamp
was not max) or they are not (endTimestamp is max, changeStreamEndTimesatmp is
now + 2m).
I was thinking that we only got down here if the query itself stops
returning elements and that was due to reaching changeStreamQueryEndTimestamp.
From "This seems not correct if we have partition terminate case", it sounds
like we may also reach this point if partition ends and the query stops
returning elements for that reason. I was thinking that this was handled by the
PartitionEndRecordAction returning Processing.stop() but I see that isn't the
case currently.
Are we guaranteed to receive a PartitionEndRecord record as the last record
on the partition before it stops? If so perhaps we can modify that action to
stop processing. Otherwise I'm not sure how to distinguish here that the query
stopped early due to
1. hitting the artificial end timestamp
2. having no more records
I think we would also have to handle isTimestampOutOfRange more explicitly
too.
I will add a commit implementing these to show what I mean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]