This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 9275865  [SPARK-31485][CORE][2.4] Avoid application hang if only 
partial barrier tasks launched
9275865 is described below

commit 9275865883113ab0ee74b88383cd9ea8d2e98e36
Author: yi.wu <[email protected]>
AuthorDate: Mon Apr 27 06:10:02 2020 +0000

    [SPARK-31485][CORE][2.4] Avoid application hang if only partial barrier 
tasks launched
    
    ### What changes were proposed in this pull request?
    
    Use `TaskSetManger.abort` to abort a barrier stage instead of throwing 
exception within `resourceOffers`.
    
    ### Why are the changes needed?
    
    Any non fatal exception thrown within Spark RPC framework can be swallowed:
    
    
https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211
    
     The method `TaskSchedulerImpl.resourceOffers` is also within the scope of 
Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail 
the application.
    
     As a result, if a barrier stage fail the require check at 
`require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage 
will fail the check again and again util all tasks from `TaskSetManager` 
dequeued.   But since the barrier stage isn't really executed, the application 
will hang.
    
    The issue can be reproduced by the following test:
    
    ```scala
    initLocalClusterSparkContext(2)
    val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
    val dep = new OneToOneDependency[Int](rdd0)
    val rdd = new MyRDD(sc, 2, List(dep), 
Seq(Seq("executor_h_0"),Seq("executor_h_0")))
    rdd.barrier().mapPartitions { iter =>
      BarrierTaskContext.get().barrier()
      iter
    }.collect()
    ```
    ### Does this PR introduce any user-facing change?
    
    Yes, application hang previously but fail-fast after this fix.
    
    ### How was this patch tested?
    
    Added a regression test.
    
    Closes #28357 from Ngone51/bp-spark-31485-24.
    
    Authored-by: yi.wu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 30 +++++++++++++++++-----
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 20 +++++++++++++++
 2 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 38dbbe7..f6c8555 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -50,6 +50,11 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, 
ThreadUtils, Utils}
  * [[SchedulerBackend]]s synchronize on themselves when they want to send 
events here, and then
  * acquire a lock on us, so we need to make sure that we don't try to lock the 
backend while
  * we are holding a lock on ourselves.
+
+ * CAUTION: Any non fatal exception thrown within Spark RPC framework can be 
swallowed.
+ * Thus, throwing exception in methods like resourceOffers, statusUpdate won't 
fail
+ * the application, but could lead to undefined behavior. Instead, we shall 
use method like
+ * TaskSetManger.abort() to abort a stage and then fail the application 
(SPARK-31485).
  */
 private[spark] class TaskSchedulerImpl(
     val sc: SparkContext,
@@ -476,11 +481,18 @@ private[spark] class TaskSchedulerImpl(
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
           // requirements are not fulfilled, and we should revert the launched 
tasks).
-          require(addressesWithDescs.size == taskSet.numTasks,
-            s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
-              s"because only ${addressesWithDescs.size} out of a total number 
of " +
-              s"${taskSet.numTasks} tasks got resource offers. The resource 
offers may have " +
-              "been blacklisted or cannot fulfill task locality requirements.")
+          if (addressesWithDescs.size != taskSet.numTasks) {
+            val errorMsg =
+              s"Fail resource offers for barrier stage ${taskSet.stageId} 
because only " +
+                s"${addressesWithDescs.size} out of a total number of 
${taskSet.numTasks}" +
+                s" tasks got resource offers. This happens because barrier 
execution currently " +
+                s"does not work gracefully with delay scheduling. We highly 
recommend you to " +
+                s"disable delay scheduling by setting spark.locality.wait=0 as 
a workaround if " +
+                s"you see this error frequently."
+            logWarning(errorMsg)
+            taskSet.abort(errorMsg)
+            throw new SparkException(errorMsg)
+          }
 
           // materialize the barrier coordinator.
           maybeInitBarrierCoordinator()
@@ -542,8 +554,12 @@ private[spark] class TaskSchedulerImpl(
             if (state == TaskState.LOST) {
               // TaskState.LOST is only used by the deprecated Mesos 
fine-grained scheduling mode,
               // where each executor corresponds to a single task, so mark the 
executor as failed.
-              val execId = taskIdToExecutorId.getOrElse(tid, throw new 
IllegalStateException(
-                "taskIdToTaskSetManager.contains(tid) <=> 
taskIdToExecutorId.contains(tid)"))
+              val execId = taskIdToExecutorId.getOrElse(tid, {
+                val errorMsg =
+                  "taskIdToTaskSetManager.contains(tid) <=> 
taskIdToExecutorId.contains(tid)"
+                taskSet.abort(errorMsg)
+                throw new SparkException(errorMsg)
+              })
               if (executorIdToRunningTaskIds.contains(execId)) {
                 reason = Some(
                   SlaveLost(s"Task $tid was lost, so marking the executor as 
lost as well."))
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 36dd620..469cc4a 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -152,4 +152,24 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext {
     assert(error.contains("The coordinator didn't get all barrier sync 
requests"))
     assert(error.contains("within 1 second(s)"))
   }
+
+  test("SPARK-31485: barrier stage should fail if only partial tasks are 
launched") {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[2, 1, 1024]")
+      .setAppName("test-cluster")
+      .set("spark.test.noStageRetry", "true")
+    sc = new SparkContext(conf)
+    val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
+    val dep = new OneToOneDependency[Int](rdd0)
+    // set up a barrier stage with 2 tasks and both tasks prefer executor 0 
(only 1 core) for
+    // scheduling. So, one of tasks won't be scheduled in one round of 
resource offer.
+    val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), 
Seq("executor_h_0")))
+    val errorMsg = intercept[SparkException] {
+      rdd.barrier().mapPartitions { iter =>
+        BarrierTaskContext.get().barrier()
+        iter
+      }.collect()
+    }.getMessage
+    assert(errorMsg.contains("Fail resource offers for barrier stage"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to