[
https://issues.apache.org/jira/browse/SPARK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14544300#comment-14544300
]
Josh Rosen commented on SPARK-7563:
-----------------------------------
Yep, looks like a pretty clear bug. I think that this messiness was introduced
because we decided to use this confusing pattern where the
OutputCommitCoordinator component is used both on the master and executors. My
original version of this patch separated the driver and executor componenets
more explicitly, but if I recall this was changed in response to some comments
during offline code review. I should have probably fought harder to keep the
separation, since having classes with sets of methods that are unsafe to call
from certain locations has been a source of multiple types of bugs like this in
the past.
Want me to put together a pull request to apply this patch?
> OutputCommitCoordinator.stop() should only be executed in driver
> ----------------------------------------------------------------
>
> Key: SPARK-7563
> URL: https://issues.apache.org/jira/browse/SPARK-7563
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
> Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are integrating Spark 1.3.1 with
> EGO (a resource management product).
> In EGO we uses fine-grained dynamic allocation policy, and each Executor will
> exit after its tasks are all done. When testing *spark-shell*, we find that
> when executor of first job exit, it will stop OutputCommitCoordinator, which
> result in all future jobs failing. Details are as follows:
> We got the following error in executor when submitting job in *spark-shell*
> the second time (the first job submission is successful):
> {noformat}
> 15/05/11 04:02:31 INFO spark.util.AkkaUtils: Connecting to
> OutputCommitCoordinator:
> akka.tcp://sparkDriver@whlspark01:50452/user/OutputCommitCoordinator
> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://sparkDriver@whlspark01:50452/),
> Path(/user/OutputCommitCoordinator)]
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> at
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
> at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> And in driver side, we see a log message telling that the
> OutputCommitCoordinator is stopped after the first submission:
> {noformat}
> 15/05/11 04:01:23 INFO
> spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor:
> OutputCommitCoordinator stopped!
> {noformat}
> We examine the code of OutputCommitCoordinator, and find that executor will
> reuse the ref of driver's OutputCommitCoordinatorActor. So when an executor
> exits, it will eventually call SparkEnv.stop():
> {noformat}
> private[spark] def stop() {
> isStopped = true
> pythonWorkers.foreach { case(key, worker) => worker.stop() }
> Option(httpFileServer).foreach(_.stop())
> mapOutputTracker.stop()
> shuffleManager.stop()
> broadcastManager.stop()
> blockManager.stop()
> blockManager.master.stop()
> metricsSystem.stop()
> outputCommitCoordinator.stop() <---------------
> actorSystem.shutdown()
> ......
> {noformat}
> and in OutputCommitCoordinator.stop():
> {noformat}
> def stop(): Unit = synchronized {
> coordinatorActor.foreach(_ ! StopCoordinator)
> coordinatorActor = None
> authorizedCommittersByStage.clear()
> }
> {noformat}
> We now work this problem around by adding an attribute "isDriver" in
> OutputCommitCoordinator and judge whether the "stop" command comes from
> driver or executor:
> {noformat}
> diff SparkEnv.scala
> 360c360
> < new OutputCommitCoordinator(conf, isDriver)
> ---
> > new OutputCommitCoordinator(conf)
> diff OutputCommitCoordinator.scala
> 43c43
> < private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver:
> Boolean = false) extends Logging {
> ---
> > private[spark] class OutputCommitCoordinator(conf: SparkConf) extends
> > Logging {
> 137,141c137,139
> < if (isDriver) {
> < coordinatorActor.foreach(_ ! StopCoordinator)
> < coordinatorActor = None
> < authorizedCommittersByStage.clear()
> < }
> ---
> > coordinatorActor.foreach(_ ! StopCoordinator)
> > coordinatorActor = None
> > authorizedCommittersByStage.clear()
> {noformat}
> We propose to apply this fix in future release since it may affects all
> *spark-shell* function of dynamic allocation model.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]