[ 
https://issues.apache.org/jira/browse/FLINK-18983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17252653#comment-17252653
 ] 

Piotr Nowojski commented on FLINK-18983:
----------------------------------------

Hey [~liuyufei] thanks for reporting this and [~ym] for investigating. First of 
all, I don't think this is a bug. Note that [the task 
cancellation|https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#task-cancellation-timeout]
 watchdog is just for that - cancellation, not for clean shutdown. Cancellation 
always happens during/after some failure, so it's more acceptable to timeout 
the task/process. In case of clean shutdown, it's a bit more tricky, as for 
example some sink might be flushing records.

Nevertheless after some offline discussion, it would be valuable to have clean 
shutdown watchdog as well (maybe disabled by default with separate 
configuration from the cancellation?). Also I believe that a clean solution for 
this ticket is closely related to FLINK-17012 and FLINK-4714. We could refactor 
{{StreamTask#invoke}} and split it into three steps:
# construction/initialisation (factory?)
# invocation/run
# close

{{Task}} would invoke those steps/methods one by one and thanks to that it 
could spawn watchdogs around {{close()}} call.

I have once already [explained how could it be 
done|https://issues.apache.org/jira/browse/FLINK-17012?focusedCommentId=17091539&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17091539].
 With small difference, it was FLINK-17012 so without the separate close taken 
into account.

Having said that, we don't have immediate plans on working on that (maybe 
sometime later), but if someone want's to take a look at this issue, we would 
be happy to review it :)

> Job doesn't changed to failed if close function has blocked
> -----------------------------------------------------------
>
>                 Key: FLINK-18983
>                 URL: https://issues.apache.org/jira/browse/FLINK-18983
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: YufeiLiu
>            Priority: Major
>
> If a operator throw a exception, it will break process loop and dispose all 
> operator. But state will never switch to FAILED if block in Function.close, 
> and JobMaster can't know the final state and do restart.
> Task have {{TaskCancelerWatchDog}} to kill process if cancellation timeout, 
> but it doesn't work for FAILED task.TAskThread will allways hang at:
> org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke
> Test case:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
> env.addSource(...)
>       .process(new ProcessFunction<String, String>() {
>               @Override
>               public void processElement(String value, Context ctx, 
> Collector<String> out) throws Exception {
>                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                               throw new RuntimeException();
>                       }
>               }
>               @Override
>               public void close() throws Exception {
>                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                               Thread.sleep(10000000);
>                       }
>               }
>       }).setParallelism(2)
>       .print();
> {code}
> In this case, job will block at close action and never change to FAILED.
> If change thread which subtaskIndex == 1 to sleep, TM will exit after 
> TASK_CANCELLATION_TIMEOUT.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to