This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new daeb081  [SPARK-26392][YARN] Cancel pending allocate requests by 
taking locality preference into account
daeb081 is described below

commit daeb0811058c76e2d6cecb6de5ebe287c3be3a94
Author: Ngone51 <ngone_5...@163.com>
AuthorDate: Thu Dec 20 10:25:52 2018 -0800

    [SPARK-26392][YARN] Cancel pending allocate requests by taking locality 
preference into account
    
    ## What changes were proposed in this pull request?
    
    Right now, we cancel pending allocate requests by its sending order. I 
thing we can take
    
    locality preference into account when do this to perfom least impact on 
task locality preference.
    
    ## How was this patch tested?
    
    N.A.
    
    Closes #23344 from 
Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account.
    
    Authored-by: Ngone51 <ngone_5...@163.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
    (cherry picked from commit 3d6b44d9ea92dc1eabb8f211176861e51240bf93)
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 29 +++++++++-------------
 1 file changed, 12 insertions(+), 17 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a7551d..f4dc80a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -287,20 +287,20 @@ private[yarn] class YarnAllocator(
       s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
       s"executorsStarting: ${numExecutorsStarting.get}")
 
+    // Split the pending container request into three groups: locality matched 
list, locality
+    // unmatched list and non-locality list. Take the locality matched 
container request into
+    // consideration of container placement, treat as allocated containers.
+    // For locality unmatched and locality free container requests, cancel 
these container
+    // requests, since required locality preference has been changed, 
recalculating using
+    // container placement strategy.
+    val (localRequests, staleRequests, anyHostRequests) = 
splitPendingAllocationsByLocality(
+      hostToLocalTaskCounts, pendingAllocate)
+
     if (missing > 0) {
       logInfo(s"Will request $missing executor container(s), each with " +
         s"${resource.getVirtualCores} core(s) and " +
         s"${resource.getMemory} MB memory (including $memoryOverhead MB of 
overhead)")
 
-      // Split the pending container request into three groups: locality 
matched list, locality
-      // unmatched list and non-locality list. Take the locality matched 
container request into
-      // consideration of container placement, treat as allocated containers.
-      // For locality unmatched and locality free container requests, cancel 
these container
-      // requests, since required locality preference has been changed, 
recalculating using
-      // container placement strategy.
-      val (localRequests, staleRequests, anyHostRequests) = 
splitPendingAllocationsByLocality(
-        hostToLocalTaskCounts, pendingAllocate)
-
       // cancel "stale" requests for locations that are no longer needed
       staleRequests.foreach { stale =>
         amClient.removeContainerRequest(stale)
@@ -360,14 +360,9 @@ private[yarn] class YarnAllocator(
       val numToCancel = math.min(numPendingAllocate, -missing)
       logInfo(s"Canceling requests for $numToCancel executor container(s) to 
have a new desired " +
         s"total $targetNumExecutors executors.")
-
-      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, 
ANY_HOST, resource)
-      if (!matchingRequests.isEmpty) {
-        matchingRequests.iterator().next().asScala
-          .take(numToCancel).foreach(amClient.removeContainerRequest)
-      } else {
-        logWarning("Expected to find pending requests, but found none.")
-      }
+      // cancel pending allocate requests by taking locality preference into 
account
+      val cancelRequests = (staleRequests ++ anyHostRequests ++ 
localRequests).take(numToCancel)
+      cancelRequests.foreach(amClient.removeContainerRequest)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to