[
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-3376:
-----------------------------------
Assignee: Apache Spark
> Memory-based shuffle strategy to reduce overhead of disk I/O
> ------------------------------------------------------------
>
> Key: SPARK-3376
> URL: https://issues.apache.org/jira/browse/SPARK-3376
> Project: Spark
> Issue Type: New Feature
> Components: Shuffle
> Affects Versions: 1.1.0
> Reporter: uncleGen
> Assignee: Apache Spark
> Labels: performance
>
> I think a memory-based shuffle can reduce some overhead of disk I/O. I just
> want to know is there any plan to do something about it. Or any suggestion
> about it. Base on the work (SPARK-2044), it is feasible to have several
> implementations of shuffle.
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
> Currently, there are two implementions of shuffle manager, i.e. SORT and
> HASH. Both of them will use disk in some stages. For examples, in the map
> side, all the intermediate data will be written into temporary files. In the
> reduce side, Spark will use external sort sometimes. In any case, disk I/O
> will bring some performance loss. Maybe,we can provide a pure-memory shuffle
> manager. In this shuffle manager, intermediate data will only go through
> memory. In some of scenes, it can improve performance. Experimentally, I
> implemented a in-memory shuffle manager upon SPARK-2044.
> 1. Following is my testing result (some heary shuffle operations):
> | data size (Byte) | partitions | resources |
> | 5131859218 | 2000 | 50 executors/ 4 cores/ 4GB |
> | settings | operation1 |
> operation2 |
> | shuffle spill & lz4 | repartition+flatMap+groupByKey | repartition +
> groupByKey |
> |memory | 38s | 16s |
> |sort | 45s | 28s |
> |hash | 46s | 28s |
> |no shuffle spill & lz4 | | |
> | memory | 16s | 16s |
> | | | |
> |shuffle spill & lzf | | |
> |memory| 28s | 27s |
> |sort | 29s | 29s |
> |hash | 41s | 30s |
> |no shuffle spill & lzf | | |
> | memory | 15s | 16s |
> In my implementation, I simply reused the "BlockManager" in the map-side and
> set the "spark.shuffle.spill" false in the reduce-side. All the intermediate
> data is cached in memory store. Just as Reynold Xin has pointed out, our
> disk-based shuffle manager has achieved a good performance. With parameter
> tuning, the disk-based shuffle manager will obtain similar performance as
> memory-based shuffle manager. However, I will continue my work and improve
> it. And as an alternative tuning option, "InMemory shuffle" is a good choice.
> Future work includes, but is not limited to:
> - memory usage management in "InMemory Shuffle" mode
> - data management when intermediate data can not fit in memory
> Test code:
> {code: borderStyle=solid}
> val conf = new SparkConf().setAppName("InMemoryShuffleTest")
> val sc = new SparkContext(conf)
> val dataPath = args(0)
> val partitions = args(1).toInt
> val rdd1 = sc.textFile(dataPath).cache()
> rdd1.count()
> val startTime = System.currentTimeMillis()
> val rdd2 = rdd1.repartition(partitions)
> .flatMap(_.split(",")).map(s => (s, s))
> .groupBy(e => e._1)
> rdd2.count()
> val endTime = System.currentTimeMillis()
> println("time: " + (endTime - startTime) / 1000 )
> {code}
> 2. Following is a Spark Sort Benchmark (in spark 1.1.1). There is no tuning
> for disk shuffle.
> 2.1. Test the influence of memory size per core
> precondition: 100GB(SORT benchmark), 100 executor /15cores 1491partitions
> (input file blocks) .
> | memory size per executor| inmemory shuffle(no shuffle spill) | sort
> shuffle | hash shuffle | improvement(vs.sort) | improvement(vs.hash) |
> | 9GB | 79.652849s | 60.102337s | failed |
> -32.7% | - |
> | 12GB | 54.821924s | 51.654897s | 109.167068s |
> -3.17% | +47.8% |
> | 15GB | 33.537199s | 40.140621s | 48.088158s |
> +16.47% | +30.26% |
> | 18GB | 30.930927s | 43.392401s | 49.830276s |
> +28.7% | +37.93% |
> 2.2. Test the influence of partition number
> 18GB/15cores per executor
> | partitions | inmemory shuffle(no shuffle spill) | sort shuffle | hash
> shuffle | improvement(vs.sort) | improvement(vs.hash) |
> | 1000 | 92.675436s | 85.193158s | 71.106323s |
> -8.78% | -30.34% |
> | 1491 | 30.930927s | 43.392401s | 49.830276s |
> +28.7% | +37.93% |
> | 2000 | 18.385s | 26.653720s | 30.103s |
> +31.02% | +38.92% |
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]