Repository: spark Updated Branches: refs/heads/branch-1.6 d7e3bfd7d -> fbf16da2e
[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the shutdown hook 1. Make sure workers and masters exit so that no worker or master will still be running when triggering the shutdown hook. 2. Set ExecutorState to FAILED if it's still RUNNING when executing the shutdown hook. This should fix the potential exceptions when exiting a local cluster ``` java.lang.AssertionError: assertion failed: executor 4 state transfer from RUNNING to RUNNING is illegal at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.lang.IllegalStateException: Shutdown hooks cannot be modified during shutdown. at org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246) at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191) at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180) at org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Author: Shixiong Zhu <shixi...@databricks.com> Closes #10269 from zsxwing/executor-state. (cherry picked from commit 2aecda284e22ec608992b6221e2f5ffbd51fcd24) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf16da2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf16da2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf16da2 Branch: refs/heads/branch-1.6 Commit: fbf16da2e53acc8678bd1454b0749d1923d4eddf Parents: d7e3bfd Author: Shixiong Zhu <shixi...@databricks.com> Authored: Sun Dec 13 22:06:39 2015 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Sun Dec 13 22:06:56 2015 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala | 2 ++ core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 5 ++--- .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 5 +++++ 3 files changed, 9 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 83ccaad..5bb62d3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -75,6 +75,8 @@ class LocalSparkCluster( // Stop the workers before the master so they don't get upset that it disconnected workerRpcEnvs.foreach(_.shutdown()) masterRpcEnvs.foreach(_.shutdown()) + workerRpcEnvs.foreach(_.awaitTermination()) + masterRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 04b20e0..1355e1a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -257,9 +257,8 @@ private[deploy] class Master( exec.state = state if (state == ExecutorState.RUNNING) { - if (oldState != ExecutorState.LAUNCHING) { - logWarning(s"Executor $execId state transfer from $oldState to RUNNING is unexpected") - } + assert(oldState == ExecutorState.LAUNCHING, + s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 25a1747..9a42487 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -71,6 +71,11 @@ private[deploy] class ExecutorRunner( workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = ShutdownHookManager.addShutdownHook { () => + // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will + // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`. + if (state == ExecutorState.RUNNING) { + state = ExecutorState.FAILED + } killProcess(Some("Worker shutting down")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org