[ 
https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966
 ] 

Piotr Nowojski edited comment on FLINK-37256 at 2/5/25 8:56 AM:
----------------------------------------------------------------

I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is:
* if we decide to finally persist watermarks across recoveries, we would have 
to re-introduce watermarks back to the in-flight data. However I'm not sure if 
we will ever be able to implement persisting watermarks, especially across 
rescalings.


was (Author: pnowojski):
I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is:
* if we decide to finally persist watermarks across recoveries, we would have 
to re-introduce watermarks back to the in-flight data

> Firing timers can block recovery process
> ----------------------------------------
>
>                 Key: FLINK-37256
>                 URL: https://issues.apache.org/jira/browse/FLINK-37256
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Task
>    Affects Versions: 2.0.0, 1.20.0
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> Splitable/interruptible timers for checkpointnig were introduced in 
> FLINK-20217 as part of the 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
>  .
> However the exact same problem can happen during recovery. Usually (only?) 
> due to a watermark that was caught along the in-flight data, that is being 
> processed during a subtask's "INITIALIZATION" phase. The problem is now that 
> while we are in the initialization phase, job can not perform any 
> checkpoints. This issue is compounded if there is some data multiplication 
> operator in the pipeline, downstream from the operator that has a lot of 
> timers to fire. What can happen then is:
> * some upstream operator A is firing a lot of timers, that produce a lot of 
> data (for example 100 000 records) while it's still INITIALIZING
> * those records are multiplied downstream (operators B, C, ...) by for 
> example factor of 100x
> * in the end, sinks have to accept ~100 000 * 100 records before that 
> upstream operator A can finish processing in-flight data and switch to RUNNING
> This can take hours.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to