[ 
https://issues.apache.org/jira/browse/SPARK-26269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726995#comment-16726995
 ] 

ASF GitHub Bot commented on SPARK-26269:
----------------------------------------

asfgit closed pull request #23223: [SPARK-26269][YARN]Yarnallocator should have 
same blacklist behaviour with yarn to maxmize use of cluster resource
URL: https://github.com/apache/spark/pull/23223
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 9497530805c1a..e158d96149622 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -612,13 +612,23 @@ private[yarn] class YarnAllocator(
             val message = "Container killed by YARN for exceeding physical 
memory limits. " +
               s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
             (true, message)
-          case _ =>
-            // all the failures which not covered above, like:
-            // disk failure, kill by app master or resource manager, ...
-            allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
-            (true, "Container marked as failed: " + containerId + onHostStr +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
+          case other_exit_status =>
+            // SPARK-26269: follow YARN's blacklisting behaviour(see 
https://github
+            // 
.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
+            // 
oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
+            // ache/hadoop/yarn/util/Apps.java#L273 for details)
+            if 
(NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
+              (false, s"Container marked as failed: $containerId$onHostStr" +
+                s". Exit status: ${completedContainer.getExitStatus}" +
+                s". Diagnostics: ${completedContainer.getDiagnostics}.")
+            } else {
+              // completed container from a bad node
+              
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
+              (true, s"Container from a bad node: $containerId$onHostStr" +
+                s". Exit status: ${completedContainer.getExitStatus}" +
+                s". Diagnostics: ${completedContainer.getDiagnostics}.")
+            }
+
 
         }
         if (exitCausedByApp) {
@@ -744,4 +754,12 @@ private object YarnAllocator {
   val MEM_REGEX = "[0-9.]+ [KMG]B"
   val VMEM_EXCEEDED_EXIT_CODE = -103
   val PMEM_EXCEEDED_EXIT_CODE = -104
+
+  val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
+    ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+    ContainerExitStatus.KILLED_BY_APPMASTER,
+    ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+    ContainerExitStatus.ABORTED,
+    ContainerExitStatus.DISKS_FAILED
+  )
 }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index ceac7cda5f8be..268976b629507 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
     if (removals.nonEmpty) {
       logInfo(s"removing nodes from YARN application master's blacklist: 
$removals")
     }
-    amClient.updateBlacklist(additions.asJava, removals.asJava)
+    if (additions.nonEmpty || removals.nonEmpty) {
+      amClient.updateBlacklist(additions.asJava, removals.asJava)
+    }
     currentBlacklistedYarnNodes = nodesToBlacklist
   }
 
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
index aeac68e6ed330..201910731e934 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala
@@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends 
SparkFunSuite with Matchers
     // expired blacklisted nodes (simulating a resource request)
     yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2"))
     // no change is communicated to YARN regarding the blacklisting
-    verify(amClientMock).updateBlacklist(Collections.emptyList(), 
Collections.emptyList())
+    verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), 
Collections.emptyList())
   }
 
   test("combining scheduler and allocation blacklist") {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index b61e7df4420ef..53a538dc1de29 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -114,13 +116,29 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
       clock)
   }
 
-  def createContainer(host: String, resource: Resource = containerResource): 
Container = {
-    val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
+  def createContainer(
+      host: String,
+      containerNumber: Int = containerNum,
+      resource: Resource = containerResource): Container = {
+    val  containerId: ContainerId = ContainerId.newContainerId(appAttemptId, 
containerNum)
     containerNum += 1
     val nodeId = NodeId.newInstance(host, 1000)
     Container.newInstance(containerId, nodeId, "", resource, 
RM_REQUEST_PRIORITY, null)
   }
 
+  def createContainers(hosts: Seq[String], containerIds: Seq[Int]): 
Seq[Container] = {
+    hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)}
+  }
+
+  def createContainerStatus(
+      containerId: ContainerId,
+      exitStatus: Int,
+      containerState: ContainerState = ContainerState.COMPLETE,
+      diagnostics: String = "diagnostics"): ContainerStatus = {
+    ContainerStatus.newInstance(containerId, containerState, diagnostics, 
exitStatus)
+  }
+
+
   test("single container allocated") {
     // request a single container and receive it
     val handler = createAllocator(1)
@@ -148,7 +166,7 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
       Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
 
     handler.updateResourceRequests()
-    val container = createContainer("host1", handler.resource)
+    val container = createContainer("host1", resource = handler.resource)
     handler.handleAllocatedContainers(Array(container))
 
     // get amount of memory and vcores from resource, so effectively skipping 
their validation
@@ -417,4 +435,55 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     clock.advance(50 * 1000L)
     handler.getNumExecutorsFailed should be (0)
   }
+
+  test("SPARK-26269: YarnAllocator should have same blacklist behaviour with 
YARN") {
+    val rmClientSpy = spy(rmClient)
+    val maxExecutors = 11
+
+    val handler = createAllocator(
+      maxExecutors,
+      rmClientSpy,
+      Map(
+        "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
+        "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
+    handler.updateResourceRequests()
+
+    val hosts = (0 until maxExecutors).map(i => s"host$i")
+    val ids = 0 to maxExecutors
+    val containers = createContainers(hosts, ids)
+
+    val nonBlacklistedStatuses = Seq(
+      ContainerExitStatus.SUCCESS,
+      ContainerExitStatus.PREEMPTED,
+      ContainerExitStatus.KILLED_EXCEEDED_VMEM,
+      ContainerExitStatus.KILLED_EXCEEDED_PMEM,
+      ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+      ContainerExitStatus.KILLED_BY_APPMASTER,
+      ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+      ContainerExitStatus.ABORTED,
+      ContainerExitStatus.DISKS_FAILED)
+
+    val nonBlacklistedContainerStatuses = 
nonBlacklistedStatuses.zipWithIndex.map {
+      case (exitStatus, idx) => createContainerStatus(containers(idx).getId, 
exitStatus)
+    }
+
+    val BLACKLISTED_EXIT_CODE = 1
+    val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, 
BLACKLISTED_EXIT_CODE)
+
+    val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until 
maxExecutors).map {
+      case (exitStatus, idx) => createContainerStatus(containers(idx).getId, 
exitStatus)
+    }
+
+    handler.handleAllocatedContainers(containers.slice(0, 9))
+    handler.processCompletedContainers(nonBlacklistedContainerStatuses)
+    verify(rmClientSpy, never())
+      .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList())
+
+    handler.handleAllocatedContainers(containers.slice(9, 11))
+    handler.processCompletedContainers(blacklistedContainerStatuses)
+    verify(rmClientSpy)
+      .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList())
+    verify(rmClientSpy)
+      .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnAllocator should have same blacklist behaviour with YARN to maxmize use 
> of cluster resource
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26269
>                 URL: https://issues.apache.org/jira/browse/SPARK-26269
>             Project: Spark
>          Issue Type: Improvement
>          Components: YARN
>    Affects Versions: 2.3.1, 2.3.2, 2.4.0
>            Reporter: wuyi
>            Priority: Minor
>             Fix For: 2.4.0
>
>
> Currently, YarnAllocator may put a node with a completed container whose exit 
> status is not one of SUCCESS, PREEMPTED, KILLED_EXCEEDED_VMEM, 
> KILLED_EXCEEDED_PMEM into blacklist. Howerver, for other exit status, e.g. 
> KILLED_BY_RESOURCEMANAGER, Yarn do not consider its related nodes shoule be 
> added into blacklist(see YARN's explaination for detail 
> https://github.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Apps.java#L273).
>  So, relaxing the current blacklist rule and having the same blacklist 
> behaviour with YARN would maxmize use of cluster resources.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to