Repository: spark Updated Branches: refs/heads/master 8b497046c -> 4e9e6aee4
[SPARK-22864][CORE] Disable allocation schedule in ExecutorAllocationManagerSuite. The scheduled task was racing with the test code and could influence the values returned to the test, triggering assertions. The change adds a new config that is only used during testing, and overrides it on the affected test suite. The issue in the bug can be reliably reproduced by reducing the interval in the test (e.g. to 10ms). While there, fixed an exception that shows up in the logs while these tests run, and simplified some code (which was also causing misleading log messages in the log output of the test). Author: Marcelo Vanzin <van...@cloudera.com> Closes #20050 from vanzin/SPARK-22864. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e9e6aee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e9e6aee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e9e6aee Branch: refs/heads/master Commit: 4e9e6aee44bb2ddb41b567d659358b22fd824222 Parents: 8b49704 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Fri Dec 29 10:51:37 2017 -0600 Committer: Imran Rashid <iras...@cloudera.com> Committed: Fri Dec 29 10:51:37 2017 -0600 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 7 +++++- .../scala/org/apache/spark/SparkContext.scala | 20 ++++++++-------- .../spark/scheduler/AsyncEventQueue.scala | 2 +- .../spark/ExecutorAllocationManagerSuite.scala | 24 +++++++++----------- 4 files changed, 28 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 5bc2d9e..2e00dc8 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -141,7 +141,11 @@ private[spark] class ExecutorAllocationManager( private val removeTimes = new mutable.HashMap[String, Long] // Polling loop interval (ms) - private val intervalMillis: Long = 100 + private val intervalMillis: Long = if (Utils.isTesting) { + conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100) + } else { + 100 + } // Clock used to schedule when executors should be added and removed private var clock: Clock = new SystemClock() @@ -856,4 +860,5 @@ private[spark] class ExecutorAllocationManager( private object ExecutorAllocationManager { val NOT_SET = Long.MaxValue + val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval" } http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fcbeddd..1dbb378 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1575,10 +1575,10 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def getExecutorIds(): Seq[String] = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.getExecutorIds() case _ => - logWarning("Requesting executors is only supported in coarse-grained mode") + logWarning("Requesting executors is not supported by current scheduler.") Nil } } @@ -1604,10 +1604,10 @@ class SparkContext(config: SparkConf) extends Logging { hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] ): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount) case _ => - logWarning("Requesting executors is only supported in coarse-grained mode") + logWarning("Requesting executors is not supported by current scheduler.") false } } @@ -1620,10 +1620,10 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def requestExecutors(numAdditionalExecutors: Int): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.requestExecutors(numAdditionalExecutors) case _ => - logWarning("Requesting executors is only supported in coarse-grained mode") + logWarning("Requesting executors is not supported by current scheduler.") false } } @@ -1642,10 +1642,10 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.killExecutors(executorIds, replace = false, force = true).nonEmpty case _ => - logWarning("Killing executors is only supported in coarse-grained mode") + logWarning("Killing executors is not supported by current scheduler.") false } } @@ -1680,10 +1680,10 @@ class SparkContext(config: SparkConf) extends Logging { */ private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => + case b: ExecutorAllocationClient => b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty case _ => - logWarning("Killing executors is only supported in coarse-grained mode") + logWarning("Killing executors is not supported by current scheduler.") false } } http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 8605e1d..7e14938 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -127,8 +127,8 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { - eventQueue.put(POISON_PILL) eventCount.incrementAndGet() + eventQueue.put(POISON_PILL) } dispatchThread.join() } http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 90b7ec4..a0cae5a 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -265,7 +265,11 @@ class ExecutorAllocationManagerSuite val task2Info = createTaskInfo(1, 0, "executor-1") post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info)) + + task1Info.markFinished(TaskState.FINISHED, System.currentTimeMillis()) post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) + + task2Info.markFinished(TaskState.FINISHED, System.currentTimeMillis()) post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) assert(adjustRequestedExecutors(manager) === -1) @@ -1063,6 +1067,9 @@ class ExecutorAllocationManagerSuite s"${sustainedSchedulerBacklogTimeout.toString}s") .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s") .set("spark.dynamicAllocation.testing", "true") + // SPARK-22864: effectively disable the allocation schedule by setting the period to a + // really long value. + .set(TESTING_SCHEDULE_INTERVAL_KEY, "10000") val sc = new SparkContext(conf) contexts += sc sc @@ -1250,28 +1257,19 @@ private class DummyLocalExternalClusterManager extends ExternalClusterManager { private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend) extends SchedulerBackend with ExecutorAllocationClient { - override private[spark] def getExecutorIds(): Seq[String] = sc.getExecutorIds() + override private[spark] def getExecutorIds(): Seq[String] = Nil override private[spark] def requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int]): Boolean = - sc.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount) + hostToLocalTaskCount: Map[String, Int]): Boolean = true - override def requestExecutors(numAdditionalExecutors: Int): Boolean = - sc.requestExecutors(numAdditionalExecutors) + override def requestExecutors(numAdditionalExecutors: Int): Boolean = true override def killExecutors( executorIds: Seq[String], replace: Boolean, - force: Boolean): Seq[String] = { - val response = sc.killExecutors(executorIds) - if (response) { - executorIds - } else { - Seq.empty[String] - } - } + force: Boolean): Seq[String] = executorIds override def start(): Unit = sb.start() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org