[
https://issues.apache.org/jira/browse/SPARK-14537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen resolved SPARK-14537.
-------------------------------
Resolution: Fixed
Fix Version/s: 2.0.0
Issue resolved by pull request 12301
[https://github.com/apache/spark/pull/12301]
> [CORE] SparkContext init hangs if master removes application before backend
> is ready.
> -------------------------------------------------------------------------------------
>
> Key: SPARK-14537
> URL: https://issues.apache.org/jira/browse/SPARK-14537
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 1.5.2
> Reporter: Charles Allen
> Fix For: 2.0.0
>
>
> During the course of the init of the spark context, the following code is
> executed.
> {code}
> setupAndStartListenerBus()
> postEnvironmentUpdate()
> postApplicationStart()
> // Post init
> _taskScheduler.postStartHook()
> _env.metricsSystem.registerSource(new
> BlockManagerSource(_env.blockManager))
> _executorAllocationManager.foreach { e =>
> _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
> }
> {code}
> If the _taskScheduler.postStartHook() is waiting for a signal from the
> backend that it is ready, AND the driver is disconnected from the master
> scheduler due to a message similar to the one below:
> {code}
> ERROR [sparkDriver-akka.actor.default-dispatcher-20]
> org.apache.spark.rpc.akka.AkkaRpcEnv - Ignore error: Exiting due to error
> from cluster scheduler: Master removed our application: FAILED
> org.apache.spark.SparkException: Exiting due to error from cluster scheduler:
> Master removed our application: FAILED
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:431)
> ~[spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:122)
> ~[spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:243)
> ~[spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(AppClient.scala:167)
> ~[spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
> ~[spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> [scala-library-2.10.5.jar:?]
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> [scala-library-2.10.5.jar:?]
> at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> [scala-library-2.10.5.jar:?]
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> [scala-library-2.10.5.jar:?]
> at
> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> [akka-actor_2.10-2.3.11.jar:?]
> at
> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
> [spark-core_2.10-1.5.2-mmx1.jar:1.5.2-mmx1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> [akka-actor_2.10-2.3.11.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> [akka-actor_2.10-2.3.11.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> [akka-actor_2.10-2.3.11.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> [akka-actor_2.10-2.3.11.jar:?]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> [akka-actor_2.10-2.3.11.jar:?]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [scala-library-2.10.5.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [scala-library-2.10.5.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [scala-library-2.10.5.jar:?]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [scala-library-2.10.5.jar:?]
> {code}
> Then the SparkContext will hang on init because the waiting for the backend
> to be ready never checks to make sure the context is still running:
> {code:title=TaskSchedulerImpl.scala}
> private def waitBackendReady(): Unit = {
> if (backend.isReady) {
> return
> }
> while (!backend.isReady) {
> synchronized {
> this.wait(100)
> }
> }
> }
> {code}
> Proposed patch is at
> https://github.com/metamx/spark/pull/17/commits/bbb11ba088d30c30d083972ccc556fef0131c3bf
> I'll get a PR going shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]