[
https://issues.apache.org/jira/browse/SPARK-27734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16893018#comment-16893018
]
Paweł Wiejacha commented on SPARK-27734:
----------------------------------------
We also encountered this problem. We've reduced the problem to shuffling 60 GiB
of data divided into 5 partitions using *repartitionAndSortWithinPartitions*()
and processing (*foreachPartition*()) all of them using a single executor that
has 2 GiB of memory assigned. Processing each partition takes ~70 minutes (52
min GC time) and CPU usage is very high (due to GC).
Setting *spark.shuffle.spill.numElementsForceSpillThreshold* is very
inconvenient, so it would be nice to accept Adrian's pull request.
> Add memory based thresholds for shuffle spill
> ---------------------------------------------
>
> Key: SPARK-27734
> URL: https://issues.apache.org/jira/browse/SPARK-27734
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, SQL
> Affects Versions: 3.0.0
> Reporter: Adrian Muraru
> Priority: Minor
>
> When running large shuffles (700TB input data, 200k map tasks, 50k reducers
> on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase.
> IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and
> ExternalSorter (reduce side) are trying to max out the available execution
> memory. This in turn doesn't play nice with the Garbage Collector and
> executors are failing with OutOfMemoryError when the memory allocation from
> these in-memory structure is maxing out the available heap size (in our case
> we are running with 9 cores/executor, 32G per executor)
> To mitigate this, I set
> {{spark.shuffle.spill.numElementsForceSpillThreshold}} to force the spill on
> disk. While this config works, it is not flexible enough as it's expressed in
> number of elements, and in our case we run multiple shuffles in a single job
> and element size is different from one stage to another.
> We have an internal patch to extend this behaviour and add two new parameters
> to control the spill based on memory usage:
> - spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold
> - spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]