Repository: spark
Updated Branches:
  refs/heads/master 74ba95228 -> a930e624e


[SPARK-9817][YARN] Improve the locality calculation of containers by taking 
pending container requests into consideraion

This is a follow-up PR to further improve the locality calculation by 
considering the pending container's request. Since the locality preferences of 
tasks may be shifted from time to time, current localities of pending container 
requests may not fully match the new preferences, this PR improve it by 
removing outdated, unmatched container requests and replace with new requests.

sryza please help to review, thanks a lot.

Author: jerryshao <[email protected]>

Closes #8100 from jerryshao/SPARK-9817.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a930e624
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a930e624
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a930e624

Branch: refs/heads/master
Commit: a930e624eb9feb0f7d37d99dcb8178feb9c0f177
Parents: 74ba952
Author: jerryshao <[email protected]>
Authored: Mon Nov 2 10:23:30 2015 -0800
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Nov 2 10:23:30 2015 -0800

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  2 +-
 ...ityPreferredContainerPlacementStrategy.scala | 60 +++++++++++++---
 .../spark/deploy/yarn/YarnAllocator.scala       | 73 ++++++++++++++++----
 .../yarn/ContainerPlacementStrategySuite.scala  | 38 ++++++++--
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 26 +++----
 5 files changed, 159 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a930e624/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4b4d999..c6a6d7a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -375,7 +375,7 @@ private[spark] class ApplicationMaster(
             }
           }
           try {
-            val numPendingAllocate = allocator.getNumPendingAllocate
+            val numPendingAllocate = allocator.getPendingAllocate.size
             val sleepInterval =
               if (numPendingAllocate > 0) {
                 val currentAllocationInterval =

http://git-wip-us.apache.org/repos/asf/spark/blob/a930e624/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index 0817802..2ec189d 100644
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.deploy.yarn
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.util.RackResolver
 
 import org.apache.spark.SparkConf
@@ -30,8 +32,8 @@ private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], rack
 /**
  * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
  * the node ratio of pending tasks, number of required cores/containers and 
and locality of current
- * existing containers. The target of this algorithm is to maximize the number 
of tasks that
- * would run locally.
+ * existing and pending allocated containers. The target of this algorithm is 
to maximize the number
+ * of tasks that would run locally.
  *
  * Consider a situation in which we have 20 tasks that require (host1, host2, 
host3)
  * and 10 tasks that require (host1, host2, host4), besides each container has 
2 cores
@@ -91,6 +93,11 @@ private[yarn] class 
LocalityPreferredContainerPlacementStrategy(
    * @param numLocalityAwareTasks number of locality required tasks
    * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
    *                             numbers running on it, used as hints for 
container allocation
+   * @param allocatedHostToContainersMap host to allocated containers map, 
used to calculate the
+   *                                     expected locality preference by 
considering the existing
+   *                                     containers
+   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
+   *                                          matches the localities of 
current required tasks.
    * @return node localities and rack localities, each locality is an array of 
string,
    *         the length of localities is the same as number of containers
    */
@@ -98,10 +105,12 @@ private[yarn] class 
LocalityPreferredContainerPlacementStrategy(
       numContainer: Int,
       numLocalityAwareTasks: Int,
       hostToLocalTaskCount: Map[String, Int],
-      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+      localityMatchedPendingAllocations: Seq[ContainerRequest]
     ): Array[ContainerLocalityPreferences] = {
     val updatedHostToContainerCount = expectedHostToContainerCount(
-      numLocalityAwareTasks, hostToLocalTaskCount, 
allocatedHostToContainersMap)
+      numLocalityAwareTasks, hostToLocalTaskCount, 
allocatedHostToContainersMap,
+        localityMatchedPendingAllocations)
     val updatedLocalityAwareContainerNum = 
updatedHostToContainerCount.values.sum
 
     // The number of containers to allocate, divided into two groups, one with 
preferred locality,
@@ -158,20 +167,28 @@ private[yarn] class 
LocalityPreferredContainerPlacementStrategy(
    * @param localityAwareTasks number of locality aware tasks
    * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
    *                             numbers running on it, used as hints for 
container allocation
+   * @param allocatedHostToContainersMap host to allocated containers map, 
used to calculate the
+   *                                     expected locality preference by 
considering the existing
+   *                                     containers
+   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
+   *                                          matches the localities of 
current required tasks.
    * @return a map with hostname as key and required number of containers on 
this host as value
    */
   private def expectedHostToContainerCount(
       localityAwareTasks: Int,
       hostToLocalTaskCount: Map[String, Int],
-      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]],
+      localityMatchedPendingAllocations: Seq[ContainerRequest]
     ): Map[String, Int] = {
     val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+    val pendingHostToContainersMap = 
pendingHostToContainerCount(localityMatchedPendingAllocations)
+
     hostToLocalTaskCount.map { case (host, count) =>
       val expectedCount =
         count.toDouble * numExecutorsPending(localityAwareTasks) / 
totalLocalTaskNum
-      val existedCount = allocatedHostToContainersMap.get(host)
-        .map(_.size)
-        .getOrElse(0)
+      // Take the locality of pending containers into consideration
+      val existedCount = 
allocatedHostToContainersMap.get(host).map(_.size).getOrElse(0) +
+        pendingHostToContainersMap.getOrElse(host, 0.0)
 
       // If existing container can not fully satisfy the expected number of 
container,
       // the required container number is expected count minus existed count. 
Otherwise the
@@ -179,4 +196,31 @@ private[yarn] class 
LocalityPreferredContainerPlacementStrategy(
       (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
     }
   }
+
+  /**
+   * According to the locality ratio and number of container requests, 
calculate the host to
+   * possible number of containers for pending allocated containers.
+   *
+   * If current locality ratio of hosts is: Host1 : Host2 : Host3 = 20 : 20 : 
10,
+   * and pending container requests is 3, so the possible number of containers 
on
+   * Host1 : Host2 : Host3 will be 1.2 : 1.2 : 0.6.
+   * @param localityMatchedPendingAllocations A sequence of pending container 
request which
+   *                                          matches the localities of 
current required tasks.
+   * @return a Map with hostname as key and possible number of containers on 
this host as value
+   */
+  private def pendingHostToContainerCount(
+      localityMatchedPendingAllocations: Seq[ContainerRequest]): Map[String, 
Double] = {
+    val pendingHostToContainerCount = new HashMap[String, Int]()
+    localityMatchedPendingAllocations.foreach { cr =>
+      cr.getNodes.asScala.foreach { n =>
+        val count = pendingHostToContainerCount.getOrElse(n, 0) + 1
+        pendingHostToContainerCount(n) = count
+      }
+    }
+
+    val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
+    val localityMatchedPendingNum = 
localityMatchedPendingAllocations.size.toDouble
+    pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / 
possibleTotalContainerNum)
+      .toMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a930e624/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 875bbd4..a0cf1b4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -157,15 +157,19 @@ private[yarn] class YarnAllocator(
   def getNumExecutorsFailed: Int = numExecutorsFailed
 
   /**
-   * Number of container requests that have not yet been fulfilled.
+   * A sequence of pending container requests that have not yet been fulfilled.
    */
-  def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+  def getPendingAllocate: Seq[ContainerRequest] = 
getPendingAtLocation(ANY_HOST)
 
   /**
-   * Number of container requests at the given location that have not yet been 
fulfilled.
+   * A sequence of pending container requests at the given location that have 
not yet been
+   * fulfilled.
    */
-  private def getNumPendingAtLocation(location: String): Int =
-    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, 
resource).asScala.map(_.size).sum
+  private def getPendingAtLocation(location: String): Seq[ContainerRequest] = {
+    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, 
resource).asScala
+      .flatMap(_.asScala)
+      .toSeq
+  }
 
   /**
    * Request as many executors from the ResourceManager as needed to reach the 
desired total. If
@@ -251,20 +255,31 @@ private[yarn] class YarnAllocator(
    * Visible for testing.
    */
   def updateResourceRequests(): Unit = {
-    val numPendingAllocate = getNumPendingAllocate
+    val pendingAllocate = getPendingAllocate
+    val numPendingAllocate = pendingAllocate.size
     val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
 
-    // TODO. Consider locality preferences of pending container requests.
-    // Since the last time we made container requests, stages have completed 
and been submitted,
-    // and that the localities at which we requested our pending executors
-    // no longer apply to our current needs. We should consider to remove all 
outstanding
-    // container requests and add requests anew each time to avoid this.
     if (missing > 0) {
       logInfo(s"Will request $missing executor containers, each with 
${resource.getVirtualCores} " +
         s"cores and ${resource.getMemory} MB memory including $memoryOverhead 
MB overhead")
 
+      // Split the pending container request into three groups: locality 
matched list, locality
+      // unmatched list and non-locality list. Take the locality matched 
container request into
+      // consideration of container placement, treat as allocated containers.
+      // For locality unmatched and locality free container requests, cancel 
these container
+      // requests, since required locality preference has been changed, 
recalculating using
+      // container placement strategy.
+      val (localityMatched, localityUnMatched, localityFree) = 
splitPendingAllocationsByLocality(
+        hostToLocalTaskCounts, pendingAllocate)
+
+      // Remove the outdated container request and recalculate the requested 
container number
+      localityUnMatched.foreach(amClient.removeContainerRequest)
+      localityFree.foreach(amClient.removeContainerRequest)
+      val updatedNumContainer = missing + localityUnMatched.size + 
localityFree.size
+
       val containerLocalityPreferences = 
containerPlacementStrategy.localityOfRequestedContainers(
-        missing, numLocalityAwareTasks, hostToLocalTaskCounts, 
allocatedHostToContainersMap)
+        updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
+          allocatedHostToContainersMap, localityMatched)
 
       for (locality <- containerLocalityPreferences) {
         val request = createContainerRequest(resource, locality.nodes, 
locality.racks)
@@ -291,7 +306,7 @@ private[yarn] class YarnAllocator(
    * Creates a container request, handling the reflection required to use YARN 
features that were
    * added in recent versions.
    */
-  protected def createContainerRequest(
+  private def createContainerRequest(
       resource: Resource,
       nodes: Array[String],
       racks: Array[String]): ContainerRequest = {
@@ -535,6 +550,38 @@ private[yarn] class YarnAllocator(
 
   private[yarn] def getNumUnexpectedContainerRelease = 
numUnexpectedContainerRelease
 
+  /**
+   * Split the pending container requests into 3 groups based on current 
localities of pending
+   * tasks.
+   * @param hostToLocalTaskCount a map of preferred hostname to possible task 
counts to be used as
+   *                             container placement hint.
+   * @param pendingAllocations A sequence of pending allocation container 
request.
+   * @return A tuple of 3 sequences, first is a sequence of locality matched 
container
+   *         requests, second is a sequence of locality unmatched container 
requests, and third is a
+   *         sequence of locality free container requests.
+   */
+  private def splitPendingAllocationsByLocality(
+      hostToLocalTaskCount: Map[String, Int],
+      pendingAllocations: Seq[ContainerRequest]
+    ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = 
{
+    val localityMatched = ArrayBuffer[ContainerRequest]()
+    val localityUnMatched = ArrayBuffer[ContainerRequest]()
+    val localityFree = ArrayBuffer[ContainerRequest]()
+
+    val preferredHosts = hostToLocalTaskCount.keySet
+    pendingAllocations.foreach { cr =>
+      val nodes = cr.getNodes
+      if (nodes == null) {
+        localityFree += cr
+      } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
+        localityMatched += cr
+      } else {
+        localityUnMatched += cr
+      }
+    }
+
+    (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
+  }
 }
 
 private object YarnAllocator {

http://git-wip-us.apache.org/repos/asf/spark/blob/a930e624/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
index b7fe4cc..afb4b69 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.scalatest.{BeforeAndAfterEach, Matchers}
 
 import org.apache.spark.SparkFunSuite
@@ -26,6 +27,9 @@ class ContainerPlacementStrategySuite extends SparkFunSuite 
with Matchers with B
   private val yarnAllocatorSuite = new YarnAllocatorSuite
   import yarnAllocatorSuite._
 
+  def createContainerRequest(nodes: Array[String]): ContainerRequest =
+    new ContainerRequest(containerResource, nodes, null, 
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+
   override def beforeEach() {
     yarnAllocatorSuite.beforeEach()
   }
@@ -44,7 +48,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite 
with Matchers with B
     handler.handleAllocatedContainers(Array(createContainer("host1"), 
createContainer("host2")))
 
     val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
-      3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), 
handler.allocatedHostToContainersMap)
+      3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10),
+        handler.allocatedHostToContainersMap, Seq.empty)
 
     assert(localities.map(_.nodes) === Array(
       Array("host3", "host4", "host5"),
@@ -66,7 +71,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite 
with Matchers with B
     ))
 
     val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
-      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), 
handler.allocatedHostToContainersMap)
+      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+        handler.allocatedHostToContainersMap, Seq.empty)
 
     assert(localities.map(_.nodes) ===
       Array(null, Array("host2", "host3"), Array("host2", "host3")))
@@ -86,7 +92,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite 
with Matchers with B
     ))
 
     val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
-      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), 
handler.allocatedHostToContainersMap)
+      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+        handler.allocatedHostToContainersMap, Seq.empty)
 
     assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
   }
@@ -105,7 +112,8 @@ class ContainerPlacementStrategySuite extends SparkFunSuite 
with Matchers with B
     ))
 
     val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
-      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), 
handler.allocatedHostToContainersMap)
+      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+        handler.allocatedHostToContainersMap, Seq.empty)
 
     assert(localities.map(_.nodes) === Array(null, null, null))
   }
@@ -118,8 +126,28 @@ class ContainerPlacementStrategySuite extends 
SparkFunSuite with Matchers with B
     handler.handleAllocatedContainers(Array(createContainer("host1"), 
createContainer("host2")))
 
     val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
-      1, 0, Map.empty, handler.allocatedHostToContainersMap)
+      1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty)
 
     assert(localities.map(_.nodes) === Array(null))
   }
+
+  test("allocate locality preferred containers by considering the localities 
of pending requests") {
+    val handler = createAllocator(3)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2")
+    ))
+
+    val pendingAllocationRequests = Seq(
+      createContainerRequest(Array("host2", "host3")),
+      createContainerRequest(Array("host1", "host4")))
+
+    val localities = 
handler.containerPlacementStrategy.localityOfRequestedContainers(
+      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10),
+        handler.allocatedHostToContainersMap, pendingAllocationRequests)
+
+    assert(localities.map(_.nodes) === Array(Array("host3")))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a930e624/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 5d05f51..bd80036 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -116,7 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val handler = createAllocator(1)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (1)
+    handler.getPendingAllocate.size should be (1)
 
     val container = createContainer("host1")
     handler.handleAllocatedContainers(Array(container))
@@ -134,7 +134,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val handler = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (4)
+    handler.getPendingAllocate.size should be (4)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host1")
@@ -154,7 +154,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val handler = createAllocator(2)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (2)
+    handler.getPendingAllocate.size should be (2)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
@@ -174,11 +174,11 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val handler = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (4)
+    handler.getPendingAllocate.size should be (4)
 
     handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
     handler.updateResourceRequests()
-    handler.getNumPendingAllocate should be (3)
+    handler.getPendingAllocate.size should be (3)
 
     val container = createContainer("host1")
     handler.handleAllocatedContainers(Array(container))
@@ -189,18 +189,18 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 
     handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
     handler.updateResourceRequests()
-    handler.getNumPendingAllocate should be (1)
+    handler.getPendingAllocate.size should be (1)
   }
 
   test("decrease total requested executors to less than currently running") {
     val handler = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (4)
+    handler.getPendingAllocate.size should be (4)
 
     handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
     handler.updateResourceRequests()
-    handler.getNumPendingAllocate should be (3)
+    handler.getPendingAllocate.size should be (3)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
@@ -210,7 +210,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
 
     handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
     handler.updateResourceRequests()
-    handler.getNumPendingAllocate should be (0)
+    handler.getPendingAllocate.size should be (0)
     handler.getNumExecutorsRunning should be (2)
   }
 
@@ -218,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val handler = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (4)
+    handler.getPendingAllocate.size should be (4)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
@@ -233,14 +233,14 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.updateResourceRequests()
     handler.processCompletedContainers(statuses.toSeq)
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (1)
+    handler.getPendingAllocate.size should be (1)
   }
 
   test("lost executor removed from backend") {
     val handler = createAllocator(4)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (4)
+    handler.getPendingAllocate.size should be (4)
 
     val container1 = createContainer("host1")
     val container2 = createContainer("host2")
@@ -255,7 +255,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     handler.processCompletedContainers(statuses.toSeq)
     handler.updateResourceRequests()
     handler.getNumExecutorsRunning should be (0)
-    handler.getNumPendingAllocate should be (2)
+    handler.getPendingAllocate.size should be (2)
     handler.getNumExecutorsFailed should be (2)
     handler.getNumUnexpectedContainerRelease should be (2)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to