This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6b44d  [SPARK-26392][YARN] Cancel pending allocate requests by 
taking locality preference into account
3d6b44d is described below

commit 3d6b44d9ea92dc1eabb8f211176861e51240bf93
Author: Ngone51 <ngone_5...@163.com>
AuthorDate: Thu Dec 20 10:25:52 2018 -0800

    [SPARK-26392][YARN] Cancel pending allocate requests by taking locality 
preference into account
    
    ## What changes were proposed in this pull request?
    
    Right now, we cancel pending allocate requests by its sending order. I 
thing we can take
    
    locality preference into account when do this to perfom least impact on 
task locality preference.
    
    ## How was this patch tested?
    
    N.A.
    
    Closes #23344 from 
Ngone51/dev-cancel-pending-allocate-requests-by-taking-locality-preference-into-account.
    
    Authored-by: Ngone51 <ngone_5...@163.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 29 +++++++++-------------
 1 file changed, 12 insertions(+), 17 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d37d0d6..54b1ec2 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -294,6 +294,15 @@ private[yarn] class YarnAllocator(
       s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
       s"executorsStarting: ${numExecutorsStarting.get}")
 
+    // Split the pending container request into three groups: locality matched 
list, locality
+    // unmatched list and non-locality list. Take the locality matched 
container request into
+    // consideration of container placement, treat as allocated containers.
+    // For locality unmatched and locality free container requests, cancel 
these container
+    // requests, since required locality preference has been changed, 
recalculating using
+    // container placement strategy.
+    val (localRequests, staleRequests, anyHostRequests) = 
splitPendingAllocationsByLocality(
+      hostToLocalTaskCounts, pendingAllocate)
+
     if (missing > 0) {
       if (log.isInfoEnabled()) {
         var requestContainerMessage = s"Will request $missing executor 
container(s), each with " +
@@ -306,15 +315,6 @@ private[yarn] class YarnAllocator(
         logInfo(requestContainerMessage)
       }
 
-      // Split the pending container request into three groups: locality 
matched list, locality
-      // unmatched list and non-locality list. Take the locality matched 
container request into
-      // consideration of container placement, treat as allocated containers.
-      // For locality unmatched and locality free container requests, cancel 
these container
-      // requests, since required locality preference has been changed, 
recalculating using
-      // container placement strategy.
-      val (localRequests, staleRequests, anyHostRequests) = 
splitPendingAllocationsByLocality(
-        hostToLocalTaskCounts, pendingAllocate)
-
       // cancel "stale" requests for locations that are no longer needed
       staleRequests.foreach { stale =>
         amClient.removeContainerRequest(stale)
@@ -374,14 +374,9 @@ private[yarn] class YarnAllocator(
       val numToCancel = math.min(numPendingAllocate, -missing)
       logInfo(s"Canceling requests for $numToCancel executor container(s) to 
have a new desired " +
         s"total $targetNumExecutors executors.")
-
-      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, 
ANY_HOST, resource)
-      if (!matchingRequests.isEmpty) {
-        matchingRequests.iterator().next().asScala
-          .take(numToCancel).foreach(amClient.removeContainerRequest)
-      } else {
-        logWarning("Expected to find pending requests, but found none.")
-      }
+      // cancel pending allocate requests by taking locality preference into 
account
+      val cancelRequests = (staleRequests ++ anyHostRequests ++ 
localRequests).take(numToCancel)
+      cancelRequests.foreach(amClient.removeContainerRequest)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to