This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 9275865 [SPARK-31485][CORE][2.4] Avoid application hang if only
partial barrier tasks launched
9275865 is described below
commit 9275865883113ab0ee74b88383cd9ea8d2e98e36
Author: yi.wu <[email protected]>
AuthorDate: Mon Apr 27 06:10:02 2020 +0000
[SPARK-31485][CORE][2.4] Avoid application hang if only partial barrier
tasks launched
### What changes were proposed in this pull request?
Use `TaskSetManger.abort` to abort a barrier stage instead of throwing
exception within `resourceOffers`.
### Why are the changes needed?
Any non fatal exception thrown within Spark RPC framework can be swallowed:
https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211
The method `TaskSchedulerImpl.resourceOffers` is also within the scope of
Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail
the application.
As a result, if a barrier stage fail the require check at
`require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage
will fail the check again and again util all tasks from `TaskSetManager`
dequeued. But since the barrier stage isn't really executed, the application
will hang.
The issue can be reproduced by the following test:
```scala
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep),
Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
BarrierTaskContext.get().barrier()
iter
}.collect()
```
### Does this PR introduce any user-facing change?
Yes, application hang previously but fail-fast after this fix.
### How was this patch tested?
Added a regression test.
Closes #28357 from Ngone51/bp-spark-31485-24.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 30 +++++++++++++++++-----
.../spark/scheduler/BarrierTaskContextSuite.scala | 20 +++++++++++++++
2 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 38dbbe7..f6c8555 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -50,6 +50,11 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock,
ThreadUtils, Utils}
* [[SchedulerBackend]]s synchronize on themselves when they want to send
events here, and then
* acquire a lock on us, so we need to make sure that we don't try to lock the
backend while
* we are holding a lock on ourselves.
+
+ * CAUTION: Any non fatal exception thrown within Spark RPC framework can be
swallowed.
+ * Thus, throwing exception in methods like resourceOffers, statusUpdate won't
fail
+ * the application, but could lead to undefined behavior. Instead, we shall
use method like
+ * TaskSetManger.abort() to abort a stage and then fail the application
(SPARK-31485).
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
@@ -476,11 +481,18 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen
when some locality
// requirements are not fulfilled, and we should revert the launched
tasks).
- require(addressesWithDescs.size == taskSet.numTasks,
- s"Skip current round of resource offers for barrier stage
${taskSet.stageId} " +
- s"because only ${addressesWithDescs.size} out of a total number
of " +
- s"${taskSet.numTasks} tasks got resource offers. The resource
offers may have " +
- "been blacklisted or cannot fulfill task locality requirements.")
+ if (addressesWithDescs.size != taskSet.numTasks) {
+ val errorMsg =
+ s"Fail resource offers for barrier stage ${taskSet.stageId}
because only " +
+ s"${addressesWithDescs.size} out of a total number of
${taskSet.numTasks}" +
+ s" tasks got resource offers. This happens because barrier
execution currently " +
+ s"does not work gracefully with delay scheduling. We highly
recommend you to " +
+ s"disable delay scheduling by setting spark.locality.wait=0 as
a workaround if " +
+ s"you see this error frequently."
+ logWarning(errorMsg)
+ taskSet.abort(errorMsg)
+ throw new SparkException(errorMsg)
+ }
// materialize the barrier coordinator.
maybeInitBarrierCoordinator()
@@ -542,8 +554,12 @@ private[spark] class TaskSchedulerImpl(
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos
fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the
executor as failed.
- val execId = taskIdToExecutorId.getOrElse(tid, throw new
IllegalStateException(
- "taskIdToTaskSetManager.contains(tid) <=>
taskIdToExecutorId.contains(tid)"))
+ val execId = taskIdToExecutorId.getOrElse(tid, {
+ val errorMsg =
+ "taskIdToTaskSetManager.contains(tid) <=>
taskIdToExecutorId.contains(tid)"
+ taskSet.abort(errorMsg)
+ throw new SparkException(errorMsg)
+ })
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as
lost as well."))
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 36dd620..469cc4a 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -152,4 +152,24 @@ class BarrierTaskContextSuite extends SparkFunSuite with
LocalSparkContext {
assert(error.contains("The coordinator didn't get all barrier sync
requests"))
assert(error.contains("within 1 second(s)"))
}
+
+ test("SPARK-31485: barrier stage should fail if only partial tasks are
launched") {
+ val conf = new SparkConf()
+ .setMaster("local-cluster[2, 1, 1024]")
+ .setAppName("test-cluster")
+ .set("spark.test.noStageRetry", "true")
+ sc = new SparkContext(conf)
+ val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
+ val dep = new OneToOneDependency[Int](rdd0)
+ // set up a barrier stage with 2 tasks and both tasks prefer executor 0
(only 1 core) for
+ // scheduling. So, one of tasks won't be scheduled in one round of
resource offer.
+ val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),
Seq("executor_h_0")))
+ val errorMsg = intercept[SparkException] {
+ rdd.barrier().mapPartitions { iter =>
+ BarrierTaskContext.get().barrier()
+ iter
+ }.collect()
+ }.getMessage
+ assert(errorMsg.contains("Fail resource offers for barrier stage"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]