scwhittle commented on code in PR #32474:
URL: https://github.com/apache/beam/pull/32474#discussion_r1770986607
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java:
##########
@@ -113,6 +114,14 @@ public Optional<ProcessContinuation> run(
final Timestamp startTimestamp = record.getStartTimestamp();
final Instant startInstant = new
Instant(startTimestamp.toSqlTimestamp().getTime());
+ if (tracker instanceof Interruptible
+ && !((Interruptible) tracker).shouldContinue(startTimestamp)) {
Review Comment:
I believe this change is to help prevent deadline-exceeds on the change
stream rpc from causing problems. The deadlines are in terms of walltime, while
the restriction tracker and the record timestamps are in terms of event/data
time.
If processing falls behind, the record timestamps and restriction tracker
ranges will be in the past but we still want to stop processing after 40
seconds elapses in walltime. Currently the soft-deadline is set to now+40
seconds which might not be reached before the rpc deadline exceeds. And if we
changed the soft-deadline to be startTimestamp+40sec, we coudl delay catch up
by stopping unnecessarily early if that takes less than 40s of walltime to pass.
Instead I think that you could have a StopWatch at the
QueryChangeStreamAction and after 40 seconds of walltime elapses you could
update the restriction tracker soft-deadline to the current claimed timestamp.
That would ensure that all the records for that timestamp are processed but
that newer timestamps would not be claimed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]