[
https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489764#comment-16489764
]
Jiang Xingbo commented on SPARK-24375:
--------------------------------------
We proposal to add new RDDBarrier and BarrierTaskContext to support barrier
scheduling in Spark, it also requires to modify how the job scheduling works a
bit to accommodate the new feature.
*Barrier Stage*: A barrier stage doesn’t launch any of its tasks until the
available slots(free CPU cores can be used to launch pending tasks) satisfies
the target to launch all the tasks at the same time, and always retry the whole
stage when any task(s) fail. One way to identify whether a stage is a barrier
stage can be tracing the RDD that the stage runs on, if the stage contains
RDDBarrier or at least one of the ancestor RDD(s) are RDDBarrier then the stage
is a barrier stage, the tracing shall stop at ShuffleRDD(s).
*Schedule Barrier Tasks*: Currently TaskScheduler schedule pending tasks on
available slots by best effort, so normally all tasks in the same stage don’t
get launched at the same time. We may add a check of total available slots
before scheduling tasks from a barrier stage taskset. It is still possible that
only partial tasks of a whole barrier stage taskset get launched due to task
locality issues, so we have to check again before launch to ensure that all
tasks in the same barrier stage get launched at the same time.
If we consider scheduling several jobs at the same time(both barrier and
regular jobs), it may be possible that barrier tasks are block by regular
tasks(when available slots are always less than that required by a barrier
stage taskset), or barrier stage taskset may block another barrier stage
taskset(when a barrier stage taskset that requires less slots is prone to be
scheduled earlier). Currently we don’t have a perfect solution for all these
scenarios, but at least we may avoid the worst case that a huge barrier stage
taskset being blocked forever on a busy cluster, using a time-based weight
approach(conceptionally, a taskset that have been pending for a longer time
will be assigned greater priority weight to be scheduled).
*Task Barrier*: Barrier tasks shall allow users to insert sync in the middle of
task execution, this can be achieved by introducing a glocal barrier operation
in TaskContext, which makes the current task wait until all tasks in the same
stage hit this barrier.
*Task Failure*: To ensure correctness, a barrier stage always retry the whole
stage when any task(s) fail. Thus, it’s quite straightforward that we shall
require kill all the running tasks of a failed stage, and that also guarantees
at most one taskset shall be running for each single stage(no zombie tasks).
*Speculative Task*: Since we require launch all tasks in a barrier stage at the
same time, there is no need to launch a speculative task for a barrier stage
taskset.
*Share TaskInfo*: To share informations between tasks in a barrier stage, we
may update them in `TaskContext.localProperties`.
*Python Support*: Expose RDDBarrier and BarrierTaskContext to pyspark.
[~cloud_fan] maybe you want to give additional information I didn't cover
above? (esp. PySpark)
> Design sketch: support barrier scheduling in Apache Spark
> ---------------------------------------------------------
>
> Key: SPARK-24375
> URL: https://issues.apache.org/jira/browse/SPARK-24375
> Project: Spark
> Issue Type: Story
> Components: Spark Core
> Affects Versions: 3.0.0
> Reporter: Xiangrui Meng
> Assignee: Jiang Xingbo
> Priority: Major
>
> This task is to outline a design sketch for the barrier scheduling SPIP
> discussion. It doesn't need to be a complete design before the vote. But it
> should at least cover both Scala/Java and PySpark.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]