Repository: spark Updated Branches: refs/heads/master affc8a887 -> 73431d8af
[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode. Currently the spark applications can be queued to the Mesos cluster dispatcher, but when multiple jobs are in queue we don't handle removing jobs from the buffer correctly while iterating and causes null pointer exception. This patch copies the buffer before iterating them, so exceptions aren't thrown when the jobs are removed. Author: Timothy Chen <[email protected]> Closes #8322 from tnachen/fix_cluster_mode. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73431d8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73431d8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73431d8a Branch: refs/heads/master Commit: 73431d8afb41b93888d2642a1ce2d011f03fb740 Parents: affc8a8 Author: Timothy Chen <[email protected]> Authored: Wed Aug 19 19:43:26 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Wed Aug 19 19:43:26 2015 -0700 ---------------------------------------------------------------------- .../cluster/mesos/MesosClusterScheduler.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73431d8a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 64ec2b8..1206f18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler( val driversToRetry = pendingRetryDrivers.filter { d => d.retryState.get.nextRetry.before(currentTime) } + scheduleTasks( - driversToRetry, + copyBuffer(driversToRetry), removeFromPendingRetryDrivers, currentOffers, tasks) + // Then we walk through the queued drivers and try to schedule them. scheduleTasks( - queuedDrivers, + copyBuffer(queuedDrivers), removeFromQueuedDrivers, currentOffers, tasks) @@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler( .foreach(o => driver.declineOffer(o.getId)) } + private def copyBuffer( + buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { + val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) + buffer.copyToBuffer(newBuffer) + newBuffer + } + def getSchedulerState(): MesosClusterSchedulerState = { - def copyBuffer( - buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { - val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) - buffer.copyToBuffer(newBuffer) - newBuffer - } stateLock.synchronized { new MesosClusterSchedulerState( frameworkId, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
