[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836083#comment-17836083
 ] 

Piotr Nowojski edited comment on FLINK-34704 at 4/11/24 9:57 AM:
-----------------------------------------------------------------

Yes, I've read the referenced dev mailing list thread and I still don't agree 
with the conclusion there, that AWOP should allow for a checkpoint to happen 
while it's {{.yield()}}'ing :)
{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 


was (Author: pnowojski):
{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-34704
>                 URL: https://issues.apache.org/jira/browse/FLINK-34704
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Zakelly Lan
>            Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to