Repository: spark
Updated Branches:
  refs/heads/master 8b497046c -> 4e9e6aee4


[SPARK-22864][CORE] Disable allocation schedule in 
ExecutorAllocationManagerSuite.

The scheduled task was racing with the test code and could influence
the values returned to the test, triggering assertions. The change adds
a new config that is only used during testing, and overrides it
on the affected test suite.

The issue in the bug can be reliably reproduced by reducing the interval
in the test (e.g. to 10ms).

While there, fixed an exception that shows up in the logs while these
tests run, and simplified some code (which was also causing misleading
log messages in the log output of the test).

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20050 from vanzin/SPARK-22864.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e9e6aee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e9e6aee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e9e6aee

Branch: refs/heads/master
Commit: 4e9e6aee44bb2ddb41b567d659358b22fd824222
Parents: 8b49704
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Fri Dec 29 10:51:37 2017 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Fri Dec 29 10:51:37 2017 -0600

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  7 +++++-
 .../scala/org/apache/spark/SparkContext.scala   | 20 ++++++++--------
 .../spark/scheduler/AsyncEventQueue.scala       |  2 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 24 +++++++++-----------
 4 files changed, 28 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 5bc2d9e..2e00dc8 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -141,7 +141,11 @@ private[spark] class ExecutorAllocationManager(
   private val removeTimes = new mutable.HashMap[String, Long]
 
   // Polling loop interval (ms)
-  private val intervalMillis: Long = 100
+  private val intervalMillis: Long = if (Utils.isTesting) {
+      conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
+    } else {
+      100
+    }
 
   // Clock used to schedule when executors should be added and removed
   private var clock: Clock = new SystemClock()
@@ -856,4 +860,5 @@ private[spark] class ExecutorAllocationManager(
 
 private object ExecutorAllocationManager {
   val NOT_SET = Long.MaxValue
+  val TESTING_SCHEDULE_INTERVAL_KEY = 
"spark.testing.dynamicAllocation.scheduleInterval"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fcbeddd..1dbb378 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1575,10 +1575,10 @@ class SparkContext(config: SparkConf) extends Logging {
 
   private[spark] def getExecutorIds(): Seq[String] = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
+      case b: ExecutorAllocationClient =>
         b.getExecutorIds()
       case _ =>
-        logWarning("Requesting executors is only supported in coarse-grained 
mode")
+        logWarning("Requesting executors is not supported by current 
scheduler.")
         Nil
     }
   }
@@ -1604,10 +1604,10 @@ class SparkContext(config: SparkConf) extends Logging {
       hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
     ): Boolean = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
+      case b: ExecutorAllocationClient =>
         b.requestTotalExecutors(numExecutors, localityAwareTasks, 
hostToLocalTaskCount)
       case _ =>
-        logWarning("Requesting executors is only supported in coarse-grained 
mode")
+        logWarning("Requesting executors is not supported by current 
scheduler.")
         false
     }
   }
@@ -1620,10 +1620,10 @@ class SparkContext(config: SparkConf) extends Logging {
   @DeveloperApi
   def requestExecutors(numAdditionalExecutors: Int): Boolean = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
+      case b: ExecutorAllocationClient =>
         b.requestExecutors(numAdditionalExecutors)
       case _ =>
-        logWarning("Requesting executors is only supported in coarse-grained 
mode")
+        logWarning("Requesting executors is not supported by current 
scheduler.")
         false
     }
   }
@@ -1642,10 +1642,10 @@ class SparkContext(config: SparkConf) extends Logging {
   @DeveloperApi
   def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
+      case b: ExecutorAllocationClient =>
         b.killExecutors(executorIds, replace = false, force = true).nonEmpty
       case _ =>
-        logWarning("Killing executors is only supported in coarse-grained 
mode")
+        logWarning("Killing executors is not supported by current scheduler.")
         false
     }
   }
@@ -1680,10 +1680,10 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
+      case b: ExecutorAllocationClient =>
         b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
       case _ =>
-        logWarning("Killing executors is only supported in coarse-grained 
mode")
+        logWarning("Killing executors is not supported by current scheduler.")
         false
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala 
b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
index 8605e1d..7e14938 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
@@ -127,8 +127,8 @@ private class AsyncEventQueue(val name: String, conf: 
SparkConf, metrics: LiveLi
       throw new IllegalStateException(s"Attempted to stop $name that has not 
yet started!")
     }
     if (stopped.compareAndSet(false, true)) {
-      eventQueue.put(POISON_PILL)
       eventCount.incrementAndGet()
+      eventQueue.put(POISON_PILL)
     }
     dispatchThread.join()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e9e6aee/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 90b7ec4..a0cae5a 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -265,7 +265,11 @@ class ExecutorAllocationManagerSuite
 
     val task2Info = createTaskInfo(1, 0, "executor-1")
     post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info))
+
+    task1Info.markFinished(TaskState.FINISHED, System.currentTimeMillis())
     post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, 
null))
+
+    task2Info.markFinished(TaskState.FINISHED, System.currentTimeMillis())
     post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, 
null))
 
     assert(adjustRequestedExecutors(manager) === -1)
@@ -1063,6 +1067,9 @@ class ExecutorAllocationManagerSuite
         s"${sustainedSchedulerBacklogTimeout.toString}s")
       .set("spark.dynamicAllocation.executorIdleTimeout", 
s"${executorIdleTimeout.toString}s")
       .set("spark.dynamicAllocation.testing", "true")
+      // SPARK-22864: effectively disable the allocation schedule by setting 
the period to a
+      // really long value.
+      .set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
     val sc = new SparkContext(conf)
     contexts += sc
     sc
@@ -1250,28 +1257,19 @@ private class DummyLocalExternalClusterManager extends 
ExternalClusterManager {
 private class DummyLocalSchedulerBackend (sc: SparkContext, sb: 
SchedulerBackend)
   extends SchedulerBackend with ExecutorAllocationClient {
 
-  override private[spark] def getExecutorIds(): Seq[String] = 
sc.getExecutorIds()
+  override private[spark] def getExecutorIds(): Seq[String] = Nil
 
   override private[spark] def requestTotalExecutors(
       numExecutors: Int,
       localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int]): Boolean =
-    sc.requestTotalExecutors(numExecutors, localityAwareTasks, 
hostToLocalTaskCount)
+      hostToLocalTaskCount: Map[String, Int]): Boolean = true
 
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean =
-    sc.requestExecutors(numAdditionalExecutors)
+  override def requestExecutors(numAdditionalExecutors: Int): Boolean = true
 
   override def killExecutors(
       executorIds: Seq[String],
       replace: Boolean,
-      force: Boolean): Seq[String] = {
-    val response = sc.killExecutors(executorIds)
-    if (response) {
-      executorIds
-    } else {
-      Seq.empty[String]
-    }
-  }
+      force: Boolean): Seq[String] = executorIds
 
   override def start(): Unit = sb.start()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to