[
https://issues.apache.org/jira/browse/FLINK-18983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
YufeiLiu updated FLINK-18983:
-----------------------------
Description:
If a operator throw a exception, it will break process loop and dispose all
operator. But state will never switch to FAILED if block in Function.close, and
JobMaster can't know the final state and do restart.
Task have {{TaskCancelerWatchDog}} to kill process if cancellation timeout, but
it doesn't work for FAILED task.TAskThread will allways hang at:
org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke
Test case:
{code:java}
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
env.addSource(...)
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx,
Collector<String> out) throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
throw new RuntimeException();
}
}
@Override
public void close() throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
Thread.sleep(10000000);
}
}
}).setParallelism(2)
.print();
{code}
In this case, job will block at close action and never change to FAILED.
If change thread which subtaskIndex == 1 to sleep, TM will exit after
TASK_CANCELLATION_TIMEOUT.
was:
If a operator throw a exception, it will break process loop and dispose all
operator. But state will never switch to FAILED if block in Function.close, and
JobMaster can't know the final state and do restart.
Task have {{TaskCancelerWatchDog}} to kill process if cancellation timeout, but
it doesn't work for FAILED task.
Can we just report final state and trigger clean up action by JM.
Test case:
{code:java}
Configuration configuration = new Configuration();
configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
env.addSource(...)
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx,
Collector<String> out) throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
throw new RuntimeException();
}
}
@Override
public void close() throws Exception {
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
Thread.sleep(10000000);
}
}
}).setParallelism(2)
.print();
{code}
In this case, job will block at close action and never change to FAILED.
If change thread which subtaskIndex == 1 to sleep, TM will exit after
TASK_CANCELLATION_TIMEOUT.
> Job doesn't changed to failed if close function has blocked
> -----------------------------------------------------------
>
> Key: FLINK-18983
> URL: https://issues.apache.org/jira/browse/FLINK-18983
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.11.0
> Reporter: YufeiLiu
> Priority: Major
>
> If a operator throw a exception, it will break process loop and dispose all
> operator. But state will never switch to FAILED if block in Function.close,
> and JobMaster can't know the final state and do restart.
> Task have {{TaskCancelerWatchDog}} to kill process if cancellation timeout,
> but it doesn't work for FAILED task.TAskThread will allways hang at:
> org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke
> Test case:
> {code:java}
> Configuration configuration = new Configuration();
> configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
> env.addSource(...)
> .process(new ProcessFunction<String, String>() {
> @Override
> public void processElement(String value, Context ctx,
> Collector<String> out) throws Exception {
> if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> throw new RuntimeException();
> }
> }
> @Override
> public void close() throws Exception {
> if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
> Thread.sleep(10000000);
> }
> }
> }).setParallelism(2)
> .print();
> {code}
> In this case, job will block at close action and never change to FAILED.
> If change thread which subtaskIndex == 1 to sleep, TM will exit after
> TASK_CANCELLATION_TIMEOUT.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)