Repository: spark Updated Branches: refs/heads/master 72396522b -> 6c726a3fb
[SPARK-5069][Core] Fix the race condition of TaskSchedulerImpl.dagScheduler It's not necessary to set `TaskSchedulerImpl.dagScheduler` in preStart. It's safe to set it after `initializeEventProcessActor()`. Author: zsxwing <[email protected]> Closes #3887 from zsxwing/SPARK-5069 and squashes the following commits: d95894f [zsxwing] Fix the race condition of TaskSchedulerImpl.dagScheduler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c726a3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c726a3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c726a3f Branch: refs/heads/master Commit: 6c726a3fbd9cd3aa5f3a1992b2132b25eabb76a0 Parents: 7239652 Author: zsxwing <[email protected]> Authored: Sun Jan 4 21:06:04 2015 -0800 Committer: Reynold Xin <[email protected]> Committed: Sun Jan 4 21:06:04 2015 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 +------ .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 1 - 2 files changed, 1 insertion(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c726a3f/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb8ccfb..259621d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -138,6 +138,7 @@ class DAGScheduler( } initializeEventProcessActor() + taskScheduler.setDAGScheduler(this) // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -1375,12 +1376,6 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { - override def preStart() { - // set DAGScheduler for taskScheduler to ensure eventProcessActor is always - // valid when the messages arrive - dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) - } - /** * The main event loop of the DAG scheduler. */ http://git-wip-us.apache.org/repos/asf/spark/blob/6c726a3f/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 40aaf9d..00812e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -305,7 +305,6 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} override def executorAdded(execId: String, host: String) {} } - taskScheduler.setDAGScheduler(dagScheduler) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
