Repository: spark
Updated Branches:
  refs/heads/master 9b9fe5f7b -> dba95ea03


[SPARK-10825] [CORE] [TESTS] Fix race conditions in 
StandaloneDynamicAllocationSuite

Fix the following issues in StandaloneDynamicAllocationSuite:

1. It should not assume master and workers start in order
2. It should not assume master and workers get ready at once
3. It should not assume the application is already registered with master after 
creating SparkContext
4. It should not access Master.app and idToApp which are not thread safe

The changes includes:
* Use `eventually` to wait until master and workers are ready to fix 1 and 2
* Use `eventually`  to wait until the application is registered with master to 
fix 3
* Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the 
application info to fix 4

Author: zsxwing <zsxw...@gmail.com>

Closes #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba95ea0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba95ea0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba95ea0

Branch: refs/heads/master
Commit: dba95ea03216e6b8e623db4a36e1018c6ed95538
Parents: 9b9fe5f
Author: zsxwing <zsxw...@gmail.com>
Authored: Tue Sep 29 11:53:28 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Sep 29 11:53:28 2015 -0700

----------------------------------------------------------------------
 .../StandaloneDynamicAllocationSuite.scala      | 305 ++++++++++++-------
 1 file changed, 192 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dba95ea0/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 1f2a0f0..2e2fa22 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.deploy
 
+import scala.concurrent.duration._
+
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
RequestMasterState}
+import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite
     }
     master = makeMaster()
     workers = makeWorkers(10, 2048)
+    // Wait until all workers register with master successfully
+    eventually(timeout(60.seconds), interval(10.millis)) {
+      assert(getMasterState.workers.size === numWorkers)
+    }
   }
 
   override def afterAll(): Unit = {
@@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite
   test("dynamic allocation default behavior") {
     sc = new SparkContext(appConf)
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // request 1 more
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 2)
     // request 1 more; this one won't go through
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 3)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 3)
     // kill all existing executors; we should end up with 3 - 2 = 1 executor
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // kill all executors again; this time we'll have 1 - 1 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster 
capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with max cores <= cores per worker") {
     sc = new SparkContext(appConf.set("spark.cores.max", "8"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 
4))
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.executors.values.head.cores === 8)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.executors.values.head.cores === 8)
+    assert(apps.head.getExecutorLimit === 1)
     // request 1 more; this one won't go through because we're already at max 
cores.
     // This highlights a limitation of using dynamic allocation with max cores 
WITHOUT
     // setting cores per executor: once an application scales down and then 
scales back
     // up, its executors may not be spread out anymore!
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 2)
     // request 1 more; this one also won't go through for the same reason
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 3)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 3)
     // kill all existing executors; we should end up with 3 - 1 = 2 executor
     // Note: we scheduled these executors together, so their cores should be 
evenly distributed
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 
4))
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(apps.head.getExecutorLimit === 2)
     // kill all executors again; this time we'll have 1 - 1 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster 
capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 
4))
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with max cores > cores per worker") {
     sc = new SparkContext(appConf.set("spark.cores.max", "16"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 
8))
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.executors.values.head.cores === 10)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.executors.values.head.cores === 10)
+    assert(apps.head.getExecutorLimit === 1)
     // request 1 more
     // Note: the cores are not evenly distributed because we scheduled these 
executors 1 by 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
+    assert(apps.head.getExecutorLimit === 2)
     // request 1 more; this one won't go through
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 3)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 3)
     // kill all existing executors; we should end up with 3 - 2 = 1 executor
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.executors.values.head.cores === 10)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.executors.values.head.cores === 10)
+    assert(apps.head.getExecutorLimit === 1)
     // kill all executors again; this time we'll have 1 - 1 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster 
capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 
8))
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with cores per executor") {
     sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 10) // 20 cores total
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 10) // 20 cores total
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // request 3 more
     assert(sc.requestExecutors(3))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 4)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 4)
     // request 10 more; only 6 will go through
     assert(sc.requestExecutors(10))
-    assert(master.apps.head.executors.size === 10)
-    assert(master.apps.head.getExecutorLimit === 14)
+    apps = getApplications()
+    assert(apps.head.executors.size === 10)
+    assert(apps.head.getExecutorLimit === 14)
     // kill 2 executors; we should get 2 back immediately
     assert(killNExecutors(sc, 2))
-    assert(master.apps.head.executors.size === 10)
-    assert(master.apps.head.getExecutorLimit === 12)
+    apps = getApplications()
+    assert(apps.head.executors.size === 10)
+    assert(apps.head.getExecutorLimit === 12)
     // kill 4 executors; we should end up with 12 - 4 = 8 executors
     assert(killNExecutors(sc, 4))
-    assert(master.apps.head.executors.size === 8)
-    assert(master.apps.head.getExecutorLimit === 8)
+    apps = getApplications()
+    assert(apps.head.executors.size === 8)
+    assert(apps.head.getExecutorLimit === 8)
     // kill all executors; this time we'll have 8 - 8 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster 
capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 10)
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 10)
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with cores per executor AND max cores") {
@@ -241,55 +291,70 @@ class StandaloneDynamicAllocationSuite
       .set("spark.executor.cores", "2")
       .set("spark.cores.max", "8"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 4) // 8 cores total
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 4) // 8 cores total
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // request 3 more
     assert(sc.requestExecutors(3))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 4)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 4)
     // request 10 more; none will go through
     assert(sc.requestExecutors(10))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 14)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 14)
     // kill all executors; 4 executors will be launched immediately
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 10)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 10)
     // ... and again
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 6)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 6)
     // ... and again; now we end up with 6 - 4 = 2 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 2)
     // ... and again; this time we have 2 - 2 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster 
capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("kill the same executor twice (SPARK-9795)") {
     sc = new SparkContext(appConf)
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // sync executors between the Master and the driver, needed because
     // the driver refuses to kill executors it does not know about
     syncExecutors(sc)
@@ -298,9 +363,10 @@ class StandaloneDynamicAllocationSuite
     assert(executors.size === 2)
     assert(sc.killExecutor(executors.head))
     assert(sc.killExecutor(executors.head))
-    assert(master.apps.head.executors.size === 1)
+    val apps = getApplications()
+    assert(apps.head.executors.size === 1)
     // The limit should not be lowered twice
-    assert(master.apps.head.getExecutorLimit === 1)
+    assert(apps.head.getExecutorLimit === 1)
   }
 
   // ===============================
@@ -333,6 +399,16 @@ class StandaloneDynamicAllocationSuite
     }
   }
 
+  /** Get the Master state */
+  private def getMasterState: MasterStateResponse = {
+    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+  }
+
+  /** Get the applictions that are active from Master */
+  private def getApplications(): Seq[ApplicationInfo] = {
+    getMasterState.activeApps
+  }
+
   /** Kill all executors belonging to this application. */
   private def killAllExecutors(sc: SparkContext): Boolean = {
     killNExecutors(sc, Int.MaxValue)
@@ -352,8 +428,11 @@ class StandaloneDynamicAllocationSuite
    * don't wait for executors to register. Otherwise the tests will take much 
longer to run.
    */
   private def getExecutorIds(sc: SparkContext): Seq[String] = {
-    assert(master.idToApp.contains(sc.applicationId))
-    master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
+    val app = getApplications().find(_.id == sc.applicationId)
+    assert(app.isDefined)
+    // Although executors is transient, master is in the same process so the 
message won't be
+    // serialized and it's safe here.
+    app.get.executors.keys.map(_.toString).toSeq
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to