scwhittle commented on code in PR #32474:
URL: https://github.com/apache/beam/pull/32474#discussion_r1770986607


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java:
##########
@@ -113,6 +114,14 @@ public Optional<ProcessContinuation> run(
 
     final Timestamp startTimestamp = record.getStartTimestamp();
     final Instant startInstant = new 
Instant(startTimestamp.toSqlTimestamp().getTime());
+    if (tracker instanceof Interruptible
+        && !((Interruptible) tracker).shouldContinue(startTimestamp)) {

Review Comment:
   I believe this change is to help prevent deadline-exceeds on the change 
stream rpc from causing problems. The deadlines are in terms of walltime, while 
the restriction tracker and the record timestamps are in terms of event/data 
time.
   
   If processing falls behind, the record timestamps and restriction tracker 
ranges will be in the past but we still want to stop processing after 40 
seconds elapses in walltime. Currently the soft-deadline is set to now+40 
seconds which might not be reached before the rpc deadline exceeds. And if we 
changed the soft-deadline to be startTimestamp+40sec, we coudl delay catch up 
by stopping unnecessarily early if that takes less than 40s of walltime to pass.
   
   Instead I think that you could have a StopWatch at the 
QueryChangeStreamAction and after 40 seconds of walltime elapses you could 
update the restriction tracker soft-deadline to the current claimed timestamp.  
That would ensure that all the records for that timestamp are processed but 
that newer timestamps would not be claimed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to