Repository: spark
Updated Branches:
  refs/heads/master 7bf4da8a3 -> 3b117d631


[SPARK-22123][CORE] Add latest failure reason for task set blacklist

## What changes were proposed in this pull request?
This patch add latest failure reason for task set blacklist.Which can be showed 
on spark ui and let user know failure reason directly.
Till now , every job which aborted by completed blacklist just show log like 
below which has no more information:
`Aborting $taskSet because task $indexInTaskSet (partition $partition) cannot 
run anywhere due to node and executor blacklist.  Blacklisting behavior cannot 
run anywhere due to node and executor blacklist.Blacklisting behavior can be 
configured via spark.blacklist.*."`
**After modify:**
```
Aborting TaskSet 0.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Some(Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: 
Fake error!
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
).

Blacklisting behavior can be configured via spark.blacklist.*.

```

## How was this patch tested?

Unit test and manually test.

Author: zhoukang <[email protected]>

Closes #19338 from caneGuy/zhoukang/improve-blacklist.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b117d63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b117d63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b117d63

Branch: refs/heads/master
Commit: 3b117d631e1ff387b70ed8efba229594f4594db5
Parents: 7bf4da8
Author: zhoukang <[email protected]>
Authored: Thu Sep 28 09:25:21 2017 +0800
Committer: jerryshao <[email protected]>
Committed: Thu Sep 28 09:25:21 2017 +0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskSetBlacklist.scala      | 14 ++++-
 .../apache/spark/scheduler/TaskSetManager.scala | 15 +++--
 .../scheduler/BlacklistIntegrationSuite.scala   |  5 +-
 .../spark/scheduler/BlacklistTrackerSuite.scala | 60 +++++++++++++-------
 .../scheduler/TaskSchedulerImplSuite.scala      | 11 +++-
 .../spark/scheduler/TaskSetBlacklistSuite.scala | 45 ++++++++++-----
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 7 files changed, 104 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
index e815b7e..233781f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
@@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
   private val blacklistedExecs = new HashSet[String]()
   private val blacklistedNodes = new HashSet[String]()
 
+  private var latestFailureReason: String = null
+
+  /**
+   * Get the most recent failure reason of this TaskSet.
+   * @return
+   */
+  def getLatestFailureReason: String = {
+    latestFailureReason
+  }
+
   /**
    * Return true if this executor is blacklisted for the given task.  This 
does *not*
    * need to return true if the executor is blacklisted for the entire stage, 
or blacklisted
@@ -94,7 +104,9 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
   private[scheduler] def updateBlacklistForFailedTask(
       host: String,
       exec: String,
-      index: Int): Unit = {
+      index: Int,
+      failureReason: String): Unit = {
+    latestFailureReason = failureReason
     val execFailures = execToFailures.getOrElseUpdate(exec, new 
ExecutorFailuresInTaskSet(host))
     execFailures.updateWithFailure(index, clock.getTimeMillis())
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3804ea8..bb86741 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -670,9 +670,14 @@ private[spark] class TaskSetManager(
           }
           if (blacklistedEverywhere) {
             val partition = tasks(indexInTaskSet).partitionId
-            abort(s"Aborting $taskSet because task $indexInTaskSet (partition 
$partition) " +
-              s"cannot run anywhere due to node and executor blacklist.  
Blacklisting behavior " +
-              s"can be configured via spark.blacklist.*.")
+            abort(s"""
+              |Aborting $taskSet because task $indexInTaskSet (partition 
$partition)
+              |cannot run anywhere due to node and executor blacklist.
+              |Most recent failure:
+              |${taskSetBlacklist.getLatestFailureReason}
+              |
+              |Blacklisting behavior can be configured via spark.blacklist.*.
+              |""".stripMargin)
           }
         }
       }
@@ -837,9 +842,9 @@ private[spark] class TaskSetManager(
     sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, 
info)
 
     if (!isZombie && reason.countTowardsTaskFailures) {
-      taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
-        info.host, info.executorId, index))
       assert (null != failureReason)
+      taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
+        info.host, info.executorId, index, failureReason))
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
         logError("Task %d in stage %s failed %d times; aborting job".format(

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index f6015cd..d3bbfd1 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -115,8 +115,9 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     withBackend(runBackend _) {
       val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
       awaitJobTermination(jobFuture, duration)
-      val pattern = ("Aborting TaskSet 0.0 because task .* " +
-        "cannot run anywhere due to node and executor blacklist").r
+      val pattern = (
+        s"""|Aborting TaskSet 0.0 because task .*
+            |cannot run anywhere due to node and executor 
blacklist""".stripMargin).r
       assert(pattern.findFirstIn(failure.getMessage).isDefined,
         s"Couldn't find $pattern in ${failure.getMessage()}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index a136d69..cd1b7a9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -110,7 +110,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
       val taskSetBlacklist = createTaskSetBlacklist(stageId)
       if (stageId % 2 == 0) {
         // fail one task in every other taskset
-        taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", 
index = 0)
+        taskSetBlacklist.updateBlacklistForFailedTask(
+          "hostA", exec = "1", index = 0, failureReason = "testing")
         failuresSoFar += 1
       }
       blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, 
taskSetBlacklist.execToFailures)
@@ -132,7 +133,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // for many different stages, executor 1 fails a task, and then the 
taskSet fails.
     (0 until failuresUntilBlacklisted * 10).foreach { stage =>
       val taskSetBlacklist = createTaskSetBlacklist(stage)
-      taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index 
= 0)
+      taskSetBlacklist.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = 0, failureReason = "testing")
     }
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
   }
@@ -147,7 +149,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
       val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
         conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
       (0 until numFailures).foreach { index =>
-        taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", 
index = index)
+        taskSetBlacklist.updateBlacklistForFailedTask(
+          "hostA", exec = "1", index = index, failureReason = "testing")
       }
       assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
       assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
@@ -170,7 +173,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Fail 4 tasks in one task set on executor 1, so that executor gets 
blacklisted for the whole
     // application.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", 
index = partition)
+      taskSetBlacklist0.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist0.execToFailures)
     assert(blacklist.nodeBlacklist() === Set())
@@ -183,7 +187,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // application.  Since that's the second executor that is blacklisted on 
the same node, we also
     // blacklist that node.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", 
index = partition)
+      taskSetBlacklist1.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist1.execToFailures)
     assert(blacklist.nodeBlacklist() === Set("hostA"))
@@ -207,7 +212,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Fail one more task, but executor isn't put back into blacklist since 
the count of failures
     // on that executor should have been reset to 0.
     val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
-    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index 
= 0)
+    taskSetBlacklist2.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, 
taskSetBlacklist2.execToFailures)
     assert(blacklist.nodeBlacklist() === Set())
     assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
@@ -221,7 +227,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Lets say that executor 1 dies completely.  We get some task failures, 
but
     // the taskset then finishes successfully (elsewhere).
     (0 until 4).foreach { partition =>
-      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", 
index = partition)
+      taskSetBlacklist0.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.handleRemovedExecutor("1")
     blacklist.updateBlacklistForSuccessfulTaskSet(
@@ -236,7 +243,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Now another executor gets spun up on that host, but it also dies.
     val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
     (0 until 4).foreach { partition =>
-      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", 
index = partition)
+      taskSetBlacklist1.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.handleRemovedExecutor("2")
     blacklist.updateBlacklistForSuccessfulTaskSet(
@@ -279,7 +287,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
 
     def failOneTaskInTaskSet(exec: String): Unit = {
       val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
-      taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
+      taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0, 
"testing")
       blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, 
taskSetBlacklist.execToFailures)
       stageId += 1
     }
@@ -354,12 +362,12 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
     val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
     // Taskset1 has one failure immediately
-    taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
+    taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
     // Then we have a *long* delay, much longer than the timeout, before any 
other failures or
     // taskset completion
     clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
     // After the long delay, we have one failure on taskset 2, on the same 
executor
-    taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
+    taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
     // Finally, we complete both tasksets.  Its important here to complete 
taskset2 *first*.  We
     // want to make sure that when taskset 1 finishes, even though we've now 
got two task failures,
     // we realize that the task failure we just added was well before the 
timeout.
@@ -377,16 +385,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // we blacklist executors on two different hosts -- make sure that doesn't 
lead to any
     // node blacklisting
     val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
-    taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index 
= 0)
-    taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index 
= 1)
+    taskSetBlacklist0.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
+    taskSetBlacklist0.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 1, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist0.execToFailures)
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
     verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 2))
     assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
 
     val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
-    taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index 
= 0)
-    taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index 
= 1)
+    taskSetBlacklist1.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 0, failureReason = "testing")
+    taskSetBlacklist1.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 1, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, 
taskSetBlacklist1.execToFailures)
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
     verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "2", 2))
@@ -395,8 +407,10 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Finally, blacklist another executor on the same node as the original 
blacklisted executor,
     // and make sure this time we *do* blacklist the node.
     val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
-    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index 
= 0)
-    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index 
= 1)
+    taskSetBlacklist2.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 0, failureReason = "testing")
+    taskSetBlacklist2.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 1, failureReason = "testing")
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist2.execToFailures)
     assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", 
"3"))
     verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "3", 2))
@@ -486,7 +500,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Fail 4 tasks in one task set on executor 1, so that executor gets 
blacklisted for the whole
     // application.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", 
index = partition)
+      taskSetBlacklist0.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist0.execToFailures)
 
@@ -497,7 +512,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // application.  Since that's the second executor that is blacklisted on 
the same node, we also
     // blacklist that node.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", 
index = partition)
+      taskSetBlacklist1.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist1.execToFailures)
 
@@ -512,7 +528,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // Fail 4 tasks in one task set on executor 1, so that executor gets 
blacklisted for the whole
     // application.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", 
index = partition)
+      taskSetBlacklist2.updateBlacklistForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist2.execToFailures)
 
@@ -523,7 +540,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
     // application.  Since that's the second executor that is blacklisted on 
the same node, we also
     // blacklist that node.
     (0 until 4).foreach { partition =>
-      taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", 
index = partition)
+      taskSetBlacklist3.updateBlacklistForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, 
taskSetBlacklist3.execToFailures)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index b8626bf..6003899 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -660,9 +660,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(tsm.isZombie)
     assert(failedTaskSet)
     val idx = failedTask.index
-    assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx 
(partition $idx) " +
-      s"cannot run anywhere due to node and executor blacklist.  Blacklisting 
behavior can be " +
-      s"configured via spark.blacklist.*.")
+    assert(failedTaskSetReason === s"""
+      |Aborting $taskSet because task $idx (partition $idx)
+      |cannot run anywhere due to node and executor blacklist.
+      |Most recent failure:
+      |${tsm.taskSetBlacklistHelperOpt.get.getLatestFailureReason}
+      |
+      |Blacklisting behavior can be configured via spark.blacklist.*.
+      |""".stripMargin)
   }
 
   test("don't abort if there is an executor available, though it hasn't had 
scheduled tasks yet") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
index f1392e9..18981d5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -37,7 +37,8 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
 
     // First, mark task 0 as failed on exec1.
     // task 0 should be blacklisted on exec1, and nowhere else
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", 
index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec1", index = 0, failureReason = "testing")
     for {
       executor <- (1 to 4).map(_.toString)
       index <- 0 until 10
@@ -49,17 +50,20 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Mark task 1 failed on exec1 -- this pushes the executor into the 
blacklist
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec1", 
index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec1", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
     // Mark one task as failed on exec2 -- not enough for any further 
blacklisting yet.
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", 
index = 0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec2", index = 0, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
     assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
     // Mark another task as failed on exec2 -- now we blacklist exec2, which 
also leads to
     // blacklisting the entire node.
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "exec2", 
index = 1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "exec2", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec1"))
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("exec2"))
     assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
@@ -108,34 +112,41 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
       .set(config.MAX_FAILED_EXEC_PER_NODE_STAGE, 3)
     val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new 
SystemClock())
     // Fail a task twice on hostA, exec:1
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 
0)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 
0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTask("1", 0))
     assert(!taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
     assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail the same task once more on hostA, exec:2
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 
0)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "2", index = 0, failureReason = "testing")
     assert(taskSetBlacklist.isNodeBlacklistedForTask("hostA", 0))
     assert(!taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail another task on hostA, exec:1.  Now that executor has failures on 
two different tasks,
     // so its blacklisted
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 
1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail a third task on hostA, exec:2, so that exec is blacklisted for the 
whole task set
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "2", index = 
2)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "2", index = 2, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
     // Fail a fourth & fifth task on hostA, exec:3.  Now we've got three 
executors that are
     // blacklisted for the taskset, so blacklist the whole node.
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 
3)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "3", index = 
4)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 3, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "3", index = 4, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("3"))
     assert(taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
   }
@@ -147,13 +158,17 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
     val conf = new SparkConf().setAppName("test").setMaster("local")
       .set(config.BLACKLIST_ENABLED.key, "true")
     val taskSetBlacklist = new TaskSetBlacklist(conf, stageId = 0, new 
SystemClock())
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 
0)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 
1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 0, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostA", exec = "1", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))
 
-    taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 
0)
-    taskSetBlacklist.updateBlacklistForFailedTask("hostB", exec = "2", index = 
1)
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 0, failureReason = "testing")
+    taskSetBlacklist.updateBlacklistForFailedTask(
+      "hostB", exec = "2", index = 1, failureReason = "testing")
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
     assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("2"))
     assert(!taskSetBlacklist.isNodeBlacklistedForTaskSet("hostA"))

http://git-wip-us.apache.org/repos/asf/spark/blob/3b117d63/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ae43f4c..5c712bd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1146,7 +1146,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     // Make sure that the blacklist ignored all of the task failures above, 
since they aren't
     // the fault of the executor where the task was running.
     verify(blacklist, never())
-      .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
+      .updateBlacklistForFailedTask(anyString(), anyString(), anyInt(), 
anyString())
   }
 
   test("update application blacklist for shuffle-fetch") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to