Repository: spark
Updated Branches:
  refs/heads/master 5282bae04 -> 9e50a1d37


[SPARK-13669][SPARK-20898][CORE] Improve the blacklist mechanism to handle 
external shuffle service unavailable situation

## What changes were proposed in this pull request?

Currently we are running into an issue with Yarn work preserving enabled + 
external shuffle service.
In the work preserving enabled scenario, the failure of NM will not lead to the 
exit of executors, so executors can still accept and run the tasks. The problem 
here is when NM is failed, external shuffle service is actually inaccessible, 
so reduce tasks will always complain about the “Fetch failure”, and the 
failure of reduce stage will make the parent stage (map stage) rerun. The 
tricky thing here is Spark scheduler is not aware of the unavailability of 
external shuffle service, and will reschedule the map tasks on the executor 
where NM is failed, and again reduce stage will be failed with “Fetch 
failure”, and after 4 retries, the job is failed. This could also apply to 
other cluster manager with external shuffle service.

So here the main problem is that we should avoid assigning tasks to those bad 
executors (where shuffle service is unavailable). Current Spark's blacklist 
mechanism could blacklist executors/nodes by failure tasks, but it doesn't 
handle this specific fetch failure scenario. So here propose to improve the 
current application blacklist mechanism to handle fetch failure issue 
(especially with external shuffle service unavailable issue), to blacklist the 
executors/nodes where shuffle fetch is unavailable.

## How was this patch tested?

Unit test and small cluster verification.

Author: jerryshao <ss...@hortonworks.com>

Closes #17113 from jerryshao/SPARK-13669.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e50a1d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e50a1d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e50a1d3

Branch: refs/heads/master
Commit: 9e50a1d37a4cf0c34e20a7c1a910ceaff41535a2
Parents: 5282bae
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Jun 26 11:14:03 2017 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Mon Jun 26 11:14:03 2017 -0500

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  5 ++
 .../spark/scheduler/BlacklistTracker.scala      | 95 +++++++++++++++-----
 .../spark/scheduler/TaskSchedulerImpl.scala     | 18 ++--
 .../apache/spark/scheduler/TaskSetManager.scala |  6 ++
 .../spark/scheduler/BlacklistTrackerSuite.scala | 55 ++++++++++++
 .../scheduler/TaskSchedulerImplSuite.scala      |  4 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 32 +++++++
 docs/configuration.md                           |  9 ++
 8 files changed, 186 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 462c189..be63c63 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -149,6 +149,11 @@ package object config {
       .internal()
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
+
+  private[spark] val BLACKLIST_FETCH_FAILURE_ENABLED =
+    ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
+      .booleanConf
+      .createWithDefault(false)
   // End blacklist confs
 
   private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index e130e60..cd8e61d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -61,6 +61,7 @@ private[scheduler] class BlacklistTracker (
   private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
   private val MAX_FAILED_EXEC_PER_NODE = 
conf.get(config.MAX_FAILED_EXEC_PER_NODE)
   val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
+  private val BLACKLIST_FETCH_FAILURE_ENABLED = 
conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)
 
   /**
    * A map from executorId to information on task failures.  Tracks the time 
of each task failure,
@@ -145,6 +146,74 @@ private[scheduler] class BlacklistTracker (
     nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
   }
 
+  private def killBlacklistedExecutor(exec: String): Unit = {
+    if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+      allocationClient match {
+        case Some(a) =>
+          logInfo(s"Killing blacklisted executor id $exec " +
+            s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
+          a.killExecutors(Seq(exec), true, true)
+        case None =>
+          logWarning(s"Not attempting to kill blacklisted executor id $exec " +
+            s"since allocation client is not defined.")
+      }
+    }
+  }
+
+  private def killExecutorsOnBlacklistedNode(node: String): Unit = {
+    if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+      allocationClient match {
+        case Some(a) =>
+          logInfo(s"Killing all executors on blacklisted host $node " +
+            s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
+          if (a.killExecutorsOnHost(node) == false) {
+            logError(s"Killing executors on node $node failed.")
+          }
+        case None =>
+          logWarning(s"Not attempting to kill executors on blacklisted host 
$node " +
+            s"since allocation client is not defined.")
+      }
+    }
+  }
+
+  def updateBlacklistForFetchFailure(host: String, exec: String): Unit = {
+    if (BLACKLIST_FETCH_FAILURE_ENABLED) {
+      // If we blacklist on fetch failures, we are implicitly saying that we 
believe the failure is
+      // non-transient, and can't be recovered from (even if this is the first 
fetch failure,
+      // stage is retried after just one failure, so we don't always get a 
chance to collect
+      // multiple fetch failures).
+      // If the external shuffle-service is on, then every other executor on 
this node would
+      // be suffering from the same issue, so we should blacklist (and 
potentially kill) all
+      // of them immediately.
+
+      val now = clock.getTimeMillis()
+      val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
+
+      if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+        if (!nodeIdToBlacklistExpiryTime.contains(host)) {
+          logInfo(s"blacklisting node $host due to fetch failure of external 
shuffle service")
+
+          nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
+          listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
+          _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+          killExecutorsOnBlacklistedNode(host)
+          updateNextExpiryTime()
+        }
+      } else if (!executorIdToBlacklistStatus.contains(exec)) {
+        logInfo(s"Blacklisting executor $exec due to fetch failure")
+
+        executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, 
expiryTimeForNewBlacklists))
+        // We hardcoded number of failure tasks to 1 for fetch failure, 
because there's no
+        // reattempt for such failure.
+        listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 1))
+        updateNextExpiryTime()
+        killBlacklistedExecutor(exec)
+
+        val blacklistedExecsOnNode = 
nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
+        blacklistedExecsOnNode += exec
+      }
+    }
+  }
 
   def updateBlacklistForSuccessfulTaskSet(
       stageId: Int,
@@ -174,17 +243,7 @@ private[scheduler] class BlacklistTracker (
         listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
         executorIdToFailureList.remove(exec)
         updateNextExpiryTime()
-        if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-          allocationClient match {
-            case Some(allocationClient) =>
-              logInfo(s"Killing blacklisted executor id $exec " +
-                s"since spark.blacklist.killBlacklistedExecutors is set.")
-              allocationClient.killExecutors(Seq(exec), true, true)
-            case None =>
-              logWarning(s"Not attempting to kill blacklisted executor id 
$exec " +
-                s"since allocation client is not defined.")
-          }
-        }
+        killBlacklistedExecutor(exec)
 
         // In addition to blacklisting the executor, we also update the data 
for failures on the
         // node, and potentially put the entire node into a blacklist as well.
@@ -199,19 +258,7 @@ private[scheduler] class BlacklistTracker (
           nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
           listenerBus.post(SparkListenerNodeBlacklisted(now, node, 
blacklistedExecsOnNode.size))
           _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
-          if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-            allocationClient match {
-              case Some(allocationClient) =>
-                logInfo(s"Killing all executors on blacklisted host $node " +
-                  s"since spark.blacklist.killBlacklistedExecutors is set.")
-                if (allocationClient.killExecutorsOnHost(node) == false) {
-                  logError(s"Killing executors on node $node failed.")
-                }
-              case None =>
-                logWarning(s"Not attempting to kill executors on blacklisted 
host $node " +
-                  s"since allocation client is not defined.")
-            }
-          }
+          killExecutorsOnBlacklistedNode(node)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bba0b29..91ec172 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,29 +51,21 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, 
Utils}
  * acquire a lock on us, so we need to make sure that we don't try to lock the 
backend while
  * we are holding a lock on ourselves.
  */
-private[spark] class TaskSchedulerImpl private[scheduler](
+private[spark] class TaskSchedulerImpl(
     val sc: SparkContext,
     val maxTaskFailures: Int,
-    private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
     isLocal: Boolean = false)
   extends TaskScheduler with Logging {
 
   import TaskSchedulerImpl._
 
   def this(sc: SparkContext) = {
-    this(
-      sc,
-      sc.conf.get(config.MAX_TASK_FAILURES),
-      TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
+    this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
   }
 
-  def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
-    this(
-      sc,
-      maxTaskFailures,
-      TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
-      isLocal = isLocal)
-  }
+  // Lazily initializing blackListTrackOpt to avoid getting empty 
ExecutorAllocationClient,
+  // because ExecutorAllocationClient is created after this TaskSchedulerImpl.
+  private[scheduler] lazy val blacklistTrackerOpt = 
maybeCreateBlacklistTracker(sc)
 
   val conf = sc.conf
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a41b059..02d374d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -774,6 +774,12 @@ private[spark] class TaskSetManager(
           tasksSuccessful += 1
         }
         isZombie = true
+
+        if (fetchFailed.bmAddress != null) {
+          blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
+            fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
+        }
+
         None
 
       case ef: ExceptionFailure =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index 571c6bb..7ff03c4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -530,4 +530,59 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     verify(allocationClientMock).killExecutors(Seq("2"), true, true)
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
+
+  test("fetch failure blacklisting kills executors, configured by 
BLACKLIST_KILL_ENABLED") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+    when(allocationClientMock.killExecutors(any(), any(), 
any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new 
Answer[Boolean] {
+      // To avoid a race between blacklisting and killing, it is important 
that the nodeBlacklist
+      // is updated before we ask the executor allocation client to kill all 
the executors
+      // on a particular host.
+      override def answer(invocation: InvocationOnMock): Boolean = {
+        if (blacklist.nodeBlacklist.contains("hostA") == false) {
+          throw new IllegalStateException("hostA should be on the blacklist")
+        }
+        true
+      }
+    })
+
+    conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
+    blacklist = new BlacklistTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+    // Disable auto-kill. Blacklist an executor and make sure killExecutors is 
not called.
+    conf.set(config.BLACKLIST_KILL_ENABLED, false)
+    blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
+
+    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutorsOnHost(any())
+
+    // Enable auto-kill. Blacklist an executor and make sure killExecutors is 
called.
+    conf.set(config.BLACKLIST_KILL_ENABLED, true)
+    blacklist = new BlacklistTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+    clock.advance(1000)
+    blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
+
+    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+    verify(allocationClientMock, never).killExecutorsOnHost(any())
+
+    assert(blacklist.executorIdToBlacklistStatus.contains("1"))
+    assert(blacklist.executorIdToBlacklistStatus("1").node === "hostA")
+    assert(blacklist.executorIdToBlacklistStatus("1").expiryTime ===
+      1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+    assert(blacklist.nextExpiryTime === 1000 + 
blacklist.BLACKLIST_TIMEOUT_MILLIS)
+    assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty)
+
+    // Enable external shuffle service to see if all the executors on this 
node will be killed.
+    conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+    clock.advance(1000)
+    blacklist.updateBlacklistForFetchFailure("hostA", exec = "2")
+
+    verify(allocationClientMock, never).killExecutors(Seq("2"), true, true)
+    verify(allocationClientMock).killExecutorsOnHost("hostA")
+
+    assert(blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
+    assert(blacklist.nodeIdToBlacklistExpiryTime("hostA") ===
+      2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+    assert(blacklist.nextExpiryTime === 1000 + 
blacklist.BLACKLIST_TIMEOUT_MILLIS)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 8b9d45f..a003377 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     conf.set(config.BLACKLIST_ENABLED, true)
     sc = new SparkContext(conf)
     taskScheduler =
-      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), 
Some(blacklist)) {
+      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
         override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): 
TaskSetManager = {
           val tsm = super.createTaskSetManager(taskSet, maxFailures)
           // we need to create a spied tsm just so we can set the 
TaskSetBlacklist
@@ -98,6 +98,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
           stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist
           tsmSpy
         }
+
+        override private[scheduler] lazy val blacklistTrackerOpt = 
Some(blacklist)
       }
     setupHelper()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index db14c9a..80fb674 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1140,6 +1140,38 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
       .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
   }
 
+  test("update application blacklist for shuffle-fetch") {
+    // Setup a taskset, and fail some one task for fetch failure.
+    val conf = new SparkConf()
+      .set(config.BLACKLIST_ENABLED, true)
+      .set(config.SHUFFLE_SERVICE_ENABLED, true)
+      .set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(4)
+    val blacklistTracker = new BlacklistTracker(sc, None)
+    val tsm = new TaskSetManager(sched, taskSet, 4, Some(blacklistTracker))
+
+    // make some offers to our taskset, to get tasks we will fail
+    val taskDescs = Seq(
+      "exec1" -> "host1",
+      "exec2" -> "host2"
+    ).flatMap { case (exec, host) =>
+      // offer each executor twice (simulating 2 cores per executor)
+      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+    }
+    assert(taskDescs.size === 4)
+
+    assert(!blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId))
+    assert(!blacklistTracker.isNodeBlacklisted("host1"))
+
+    // Fail the task with fetch failure
+    tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED,
+      FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 
0, 0, "ignored"))
+
+    assert(blacklistTracker.isNodeBlacklisted("host1"))
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): 
DirectTaskResult[Int] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9e50a1d3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index f4bec58..c8e6153 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1480,6 +1480,15 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.blacklist.application.fetchFailure.enabled</code></td>
+  <td>false</td>
+  <td>
+    (Experimental) If set to "true", Spark will blacklist the executor 
immediately when a fetch 
+    failure happenes. If external shuffle service is enabled, then the whole 
node will be 
+    blacklisted.
+  </td>
+</tr>
+<tr>
   <td><code>spark.speculation</code></td>
   <td>false</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to