[
https://issues.apache.org/jira/browse/SPARK-2387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073989#comment-14073989
]
Rui Li commented on SPARK-2387:
-------------------------------
[~kayousterhout] Thanks for the review.
I tested this PoC with graphx.SynthBenchmark, the test is done on a 7-node
cluster, and each node runs an executor with 32 CPUs and 90GB memory. For
certain case (-numEPart=112 -nverts=10000000 -niter=3), it improves the job by
about 10%. I did notice there's regression for very small cases. I think this
is because the shuffle map stage is quite short and the overlap is not as
obvious.
Actually I think a very long map straggler is the best use case for this PoC,
in that all the pre-started reducers are waiting for output from just one map
task. When that map task finishes, all the waiting reducers can finish almost
at the same time. Therefore in this case, we can almost cut off the whole
execution time of the reduce stage, compared to normal mode with stage
barriers. I agree the early reducers can prevent other jobs from being
launched, but I suppose it only happens if multiple jobs are submitted via a
spark context concurrently. Not sure if this is a common case?
The PoC may also provide flexibility for different shuffle implementations. For
example, in a push-style shuffle, the pushed data won't have to be stored to
disk if the reducers have started on the destination node.
This PoC indeed has the potential to cause some "deadlock" i.e. the pre-started
reducers take up all the slots while there're pending map tasks. I try to avoid
this in the PR by checking free slots before launching the reduce stage, and
giving the pre-started stage lower priority when scheduling. But it doesn't
solve the issue perfectly due to delay schedule. And unfortunately map tasks
are more likely to be delayed because of locality preference. The deadlock may
also occur with task fail over, when a map task reports success but we failed
to get the task result later (not common because a map status is usually small
enough to fit into a direct result). We talked about this problem a bit. It
seems that to solve it, we either have to pump some physical resource
information (e.g. free/total slots) to DAGScheduler, or push some dependency
information to TaskScheduler. Either way seems to involve many modifications
and somehow violate the current design principle of schedulers. So I left this
open and want to see if you guys have any ideas on this...
> Remove the stage barrier for better resource utilization
> --------------------------------------------------------
>
> Key: SPARK-2387
> URL: https://issues.apache.org/jira/browse/SPARK-2387
> Project: Spark
> Issue Type: New Feature
> Components: Spark Core
> Reporter: Rui Li
>
> DAGScheduler divides a Spark job into multiple stages according to RDD
> dependencies. Whenever there’s a shuffle dependency, DAGScheduler creates a
> shuffle map stage on the map side, and another stage depending on that stage.
> Currently, the downstream stage cannot start until all its depended stages
> have finished. This barrier between stages leads to idle slots when waiting
> for the last few upstream tasks to finish and thus wasting cluster resources.
> Therefore we propose to remove the barrier and pre-start the reduce stage
> once there're free slots. This can achieve better resource utilization and
> improve the overall job performance, especially when there're lots of
> executors granted to the application.
--
This message was sent by Atlassian JIRA
(v6.2#6252)