[ 
https://issues.apache.org/jira/browse/FLINK-20261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236648#comment-17236648
 ] 

Jiangjie Qin commented on FLINK-20261:
--------------------------------------

It seems the root cause here is a little different. Because the 
SourceCoordinator is single threaded, so actually there should not be any race 
condition with its state. In the reported case, the calling sequence in the 
`SplitEnumerator` side was:
 # {{SplitEnumerator.subtaskFailed()}} is called, the failed subtask is removed 
from the {{SplitEnumeratorContext}}.
 # {{FileSplitEnumerator.handleSplitRequest()}} is invoked to let the split 
enumerator handle the split request from the failed task. At this point the the 
failed subtask should have already been unregistered.

The problem here is that at step 2, both {{ContinuousFileSplitEnumerator}} and 
{{StaticFileSplitEnumerator}} naively call 
`SplitEnumeratorContext.assignSplits()` to assign split to the failed subtask 
without checking the liveness of the subtask in question. This will trigger 
IllegalArgumentException in the {{SplitEnumeratorContext}}.

So I think the fix here should be just ignoring the split request when the 
subtask does not exist in the {{SplitEnumeratorContext}}.

That being said, the exception handling in the ExecutorNotifier still needs to 
be improved. But that seems orthogonal to the issue reported here.

CC: [~sewen] Can you also help check if this is the case?

> Uncaught exception in ExecutorNotifier due to split assignment broken by 
> failed task
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-20261
>                 URL: https://issues.apache.org/jira/browse/FLINK-20261
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common, Connectors / FileSystem
>    Affects Versions: 1.12.0
>            Reporter: Andrey Zagrebin
>            Priority: Blocker
>             Fix For: 1.12.0
>
>
> While trying to extend 
> {{FileSourceTextLinesITCase::testContinuousTextFileSourceWithTaskManagerFailover}}
>  with recovery test after TM failure 
> ({{TestingMiniCluster::terminateTaskExecutor}}, 
> [branch|https://github.com/azagrebin/flink/tree/FLINK-20118-it]) in 
> FLINK-20118, I encountered the following case:
> * {{SourceCoordinatorContext::assignSplits}} schedules async assignment (all 
> reader tasks alive)
> * call {{TestingMiniCluster::terminateTaskExecutor}} while doing writeFile in 
> a loop of testContinuousTextFileSource
> * causes graceful {{TaskExecutor::onStop}} shutdown
> * causes TM/RM disconnect and failing slot allocations in JM by RM
> * eventually causes {{SourceCoordinatorContext::unregisterSourceReader}}
> * actual assignment starts ({{SourceCoordinatorContext::assignSplits: 
> callInCoordinatorThread}})
> * {{registeredReaders.containsKey(subtaskId)}} check fails (due to failed 
> task) with {{IllegalArgumentException}} which is uncaught in single thread 
> executor
> * forces ThreadPool to recreate the single thread
> * calls {{CoordinatorExecutorThreadFactory::newThread}}
> * fails expected condition of single thread creation with 
> {{IllegalStateException}} which is uncaught
> * calls {{FatalExitExceptionHandler}} and exits JVM abruptly
> {code:java}
> [SourceCoordinator-Source: file-source] ERROR 
> org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 
> 'SourceCoordinator-Source: file-source' produced an uncaught exception. 
> Stopping the process...
> java.lang.IllegalStateException: Should never happen. This factory should 
> only be used by a SingleThreadExecutor.
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider$CoordinatorExecutorThreadFactory.newThread(SourceCoordinatorProvider.java:94)
>  ~[classes/:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:619)
>  ~[?:1.8.0_172]
>       at 
> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
>  ~[?:1.8.0_172]
>       at 
> java.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025)
>  ~[?:1.8.0_172]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
>  ~[?:1.8.0_172]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_172]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
> Process finished with exit code 239
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to