[ 
https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35051:
-----------------------------------
    Description: 
While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` /  triggering checkpoints in `StreamTask` 
can be unnecessary delayed by other mailbox actions in the system, like for 
example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks

should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.

  was:
While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` time can be increased, and triggering 
checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions 
in the system, like for example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks

should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.


> Weird priorities when processing unaligned checkpoints
> ------------------------------------------------------
>
>                 Key: FLINK-35051
>                 URL: https://issues.apache.org/jira/browse/FLINK-35051
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Network, Runtime / 
> Task
>    Affects Versions: 1.17.2, 1.19.0, 1.18.1
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> While looking through the code I noticed that `StreamTask` is processing 
> unaligned checkpoints in strange order/priority. The end result is that 
> unaligned checkpoint `Start Delay` /  triggering checkpoints in `StreamTask` 
> can be unnecessary delayed by other mailbox actions in the system, like for 
> example:
> * processing time timers
> * `AsyncWaitOperator` results
> * ... 
> Incoming UC barrier is treated as a priority event by the network stack (it 
> will be polled from the input before anything else). This is what we want, 
> but polling elements from network stack has lower priority then processing 
> enqueued mailbox actions.
> Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
> this mailbox action is also not prioritised in any way, so other mailbox 
> actions could be unnecessarily executed first. 
> On top of that there is a clash of two separate concepts here:
> # Mailbox priority. yieldToDownstream - so in a sense reverse to what we 
> would like to have for triggering checkpoint, but that only kicks in #yield() 
> calls, where it's actually correct, that operator in a middle of execution 
> can not yield to checkpoint - it should only yield to downstream.
> # Control mails in mailbox executor - cancellation is done via that, it 
> bypasses whole mailbox queue.
> # Priority events in the network stack.
> It's unfortunate that 1. vs 3. has a naming clash, as priority name is used 
> in both things, and highest network priority event containing UC barrier, 
> when executed via mailbox has actually the lowest mailbox priority.
> Control mails mechanism is a kind of priority mails executed out of order, 
> but doesn't generalise well for use in checkpointing.
> This whole thing should be re-worked at some point. Ideally what we would 
> like have is that:
> * mail to convert AC barriers to UC
> * polling UC barrier from the network input
> * checkpoint trigger via RPC for source tasks
> should be processed first, with an exception of yieldToDownstream, where 
> current mailbox priorities should be adhered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to