Hi all, I noticed that when the JVM for an executor fails, in Standalone mode, we have two duplicate code paths that handle the failure, one via Akka, and the second via the Worker/ExecutorRunner:
via Akka: (1) CoarseGrainedSchedulerBackend is notified that the remote Akka endpoint is disconnected: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L189 and it calls CoarseGrainedSchedulerBackend.removeExecutor (2) removeExecutor() tells the task scheduler to reschedule all of the tasks that were running on that executor via the Worker/ExecutorRunner: (1) The ExecutorRunner notes that the Executor process has failed and notifies the Spark Master (2) The Master notifies the AppClient (for the particular application), which then notifies the SparkDeploySchedulerBackend (which subclasses SparkDeploySchedulerBackend) (3) SparkDeploySchedulerBackend calls CoarseGrainedSchedulerBackend.removeExecutor, which eventually tells the task scheduler that the executor was lost and all tasks running on it should be re-scheduled (as above) For YARN, my understanding is that there is a 3rd code path where the YarnAllocator's processCompletedContainers() gets information about the process's exit from the master, and translates it into an "ExecutorExited" message that gets passed to the scheduler, similar to in the Worker/ExecutorRunner case (YARN folks, is this correct?). It's confusing and error prone to have these multiple different ways of handling failures (I ran into this problem because I was fixing a bug where one of the code paths can lead to a hang, but the other one doesn't). Can we eliminate all but one of these code paths? Is there a reason for the duplicate error handling? Do all of the cluster managers (Standalone, YARN, Mesos) communicate in some way when an Executor has failed, so we can ignore the Akka code path? The Akka code path is most tempting to eliminate because it has less information about the failure (the other code path typically has an exit code for the process, at a minimum). I'm also curious if others have seen this issue; for example, Marcelo, I'm wondering if this came up in your attempts to treat YARN pre-emption differently (did you run into issues where, when YARN pre-empts an executor, Spark gets the "Rpc disassociated" failure from AKKA before the more useful error from Yarn saying that the executor was pre-empted?). -Kay ---------------------------------- To reproduce this issue, you can run one of these jobs: sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) throw new OutOfMemoryError("test OOM") } or sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }