[ 
https://issues.apache.org/jira/browse/FLINK-24846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441253#comment-17441253
 ] 

Yun Gao commented on FLINK-24846:
---------------------------------

The direct reason for this problem is currently on stopping the StreamTask, the 
mailbox would be first preClose to forbid adding new mails, then it would drain 
all the pending mails. However, in this case for the AsyncWaitOperator to 
produce the last piece of the async task result, it seems to use a recursive 
way to output the pending completed records: namely for each mail it outputs 
one record, if there are new records, it would submit more mails, which 
conflict with the action of preClose before. 

Perhaps after we finish reverting the process of stop-with-savepoint to first 
endInput and emit all the completed elements, and then taking a savepoint and 
terminate, the issue would be solved? 

> AsyncWaitOperator fails during stop-with-savepoint
> --------------------------------------------------
>
>                 Key: FLINK-24846
>                 URL: https://issues.apache.org/jira/browse/FLINK-24846
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.14.0
>            Reporter: Piotr Nowojski
>            Priority: Critical
>         Attachments: log-jm.txt
>
>
> {noformat}
> Caused by: 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox$MailboxClosedException:
>  Mailbox is in state QUIESCED, but is required to be in state OPEN for put 
> operations.
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkPutStateConditions(TaskMailboxImpl.java:269)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.put(TaskMailboxImpl.java:197)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.execute(MailboxExecutorImpl.java:74)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.api.common.operators.MailboxExecutor.execute(MailboxExecutor.java:103)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:304)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:370)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:351)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.drain(MailboxProcessor.java:177)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:854)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:767)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.lang.Thread.run(Thread.java:829) ~[?:?]
> {noformat}
> As reported by a user on [the mailing 
> list:|https://mail-archives.apache.org/mod_mbox/flink-user/202111.mbox/%3CCAO6dnLwtLNxkr9qXG202ysrnse18Wgvph4hqHZe3ar8cuXAfDw%40mail.gmail.com%3E]
> {quote}
> I failed to stop a job with savepoint with the following message:
> Inconsistent execution state after stopping with savepoint. At least one 
> execution is still in one of the following states: FAILED, CANCELED. A global 
> fail-over is triggered to recover the job 452594f3ec5797f399e07f95c884a44b.
> The job manager said
>  A savepoint was created at 
> hdfs://mobdata-flink-hdfs/driving-habits/svpts/savepoint-452594-f60305755d0e 
> but the corresponding job 452594f3ec5797f399e07f95c884a44b didn't terminate 
> successfully.
> while complaining about
> Mailbox is in state QUIESCED, but is required to be in state OPEN for put 
> operations.
> Is it okay to ignore this kind of error?
> Please see the attached files for the detailed context.
> FYI, 
> - I used the latest 1.14.0
> - I started the job with "$FLINK_HOME"/bin/flink run --target yarn-per-job
> - I couldn't reproduce the exception using the same jar so I might not able 
> to provide DUBUG messages
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to