Repository: spark Updated Branches: refs/heads/master 303f00a4b -> fd6c3a0b1
[SPARK-19263] Fix race in SchedulerIntegrationSuite. ## What changes were proposed in this pull request? All the process of offering resource and generating `TaskDescription` should be guarded by taskScheduler.synchronized in `reviveOffers`, otherwise there is race condition. ## How was this patch tested? Existing unit tests. Author: jinxing <[email protected]> Closes #16831 from jinxing64/SPARK-19263-FixRaceInTest. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd6c3a0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd6c3a0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd6c3a0b Branch: refs/heads/master Commit: fd6c3a0b10ce43a56df845ba66d160b77f02e576 Parents: 303f00a Author: jinxing <[email protected]> Authored: Thu Feb 9 16:05:44 2017 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Thu Feb 9 16:05:44 2017 -0800 ---------------------------------------------------------------------- .../spark/scheduler/SchedulerIntegrationSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fd6c3a0b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 2ba63da..398ac3d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -391,17 +391,17 @@ private[spark] abstract class MockBackend( * scheduling. */ override def reviveOffers(): Unit = { - val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten - // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual - // tests from introducing a race if they need it - val newTasks = taskScheduler.synchronized { - newTaskDescriptions.map { taskDescription => + // Need a lock on the entire scheduler to protect freeCores -- otherwise, multiple threads + // may make offers at the same time, though they are using the same set of freeCores. + taskScheduler.synchronized { + val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten + // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual + // tests from introducing a race if they need it. + val newTasks = newTaskDescriptions.map { taskDescription => val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet val task = taskSet.tasks(taskDescription.index) (taskDescription, task) } - } - synchronized { newTasks.foreach { case (taskDescription, _) => executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
