[
https://issues.apache.org/jira/browse/SPARK-33701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408557#comment-17408557
]
Apache Spark commented on SPARK-33701:
--------------------------------------
User 'venkata91' has created a pull request for this issue:
https://github.com/apache/spark/pull/33896
> Adaptive shuffle merge finalization for push-based shuffle
> ----------------------------------------------------------
>
> Key: SPARK-33701
> URL: https://issues.apache.org/jira/browse/SPARK-33701
> Project: Spark
> Issue Type: Sub-task
> Components: Shuffle, Spark Core
> Affects Versions: 3.1.0
> Reporter: Min Shen
> Priority: Major
>
> SPARK-32920 implements a simple approach for shuffle merge finalization,
> which transitions from shuffle map stage to reduce stage when push-based
> shuffle is enabled.
> This simple approach basically waits for a static period of time after all
> map tasks are finished before initiating shuffle merge finalization. This
> approach is not very ideal to handle jobs with varying size of shuffles. For
> a small shuffle, we want the merge finalization to happen as early and as
> quickly as possible. For a large shuffle, we might want to wait for longer
> time to achieve a better merge ratio. A static configuration for the entire
> job cannot adapt to such varying needs.
> This raises the need for adaptive shuffle merge finalization, where the
> amount of time to wait before merge finalization is adaptive to the size of
> the shuffle. We have implemented an effective adaptive shuffle merge
> finalization mechanism, which introduces 2 more config parameters:
> spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio.
> Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge
> finalization works in the following way:
> # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data
> generated by a mapper, it notifies the Spark driver about this.
> # When the Spark driver receives notification of a completed shuffle push,
> it updates state maintained in the corresponding ShuffleDependency.
> # If the ratio of completed pushes (# completed pushes / # map tasks)
> exceeds minPushRatio, the driver would then immediately schedule shuffle
> merge finalization.
> # If the driver receives notification that all map tasks have finished
> first, it would then gather the size of the shuffle from MapOutputStatistics.
> If the total shuffle size is smaller than minSizeToWait, the driver would
> ignore the pushed shuffle partition and treat the shuffle as a regular
> shuffle and start schedule the reduce stage. It would also asynchronously
> schedule shuffle merge finalization immediately, but ignores all the
> responses.
> # If the total shuffle size is larger than minSizeToWait, the driver would
> schedule shuffle merge finalization after waiting for a period of time of
> finalize.time. If during this wait time the driver receives enough push
> completion notification to reach minPushRatio, the driver would then
> reschedule the shuffle merge finalization for immediate execution.
> In addition to the above, per SPARK-36530, we should also check if no block
> gets pushed because all blocks are larger than
> spark.shuffle.push.maxBlockSizeToPush. If so, we should also skip shuffle
> merge finalization. The information about whether any blocks from a mapper
> get pushed can be included in the new RPC between Spark executor/driver to
> notify driver about push completion.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]