[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-3376:
-----------------------------
    Target Version/s:   (was: 1.3.0)

> Memory-based shuffle strategy to reduce overhead of disk I/O
> ------------------------------------------------------------
>
>                 Key: SPARK-3376
>                 URL: https://issues.apache.org/jira/browse/SPARK-3376
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle
>    Affects Versions: 1.1.0
>            Reporter: uncleGen
>              Labels: performance
>
> I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
> want to know is there any plan to do something about it. Or any suggestion 
> about it. Base on the work (SPARK-2044), it is feasible to have several 
> implementations of  shuffle.
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
> Currently, there are two implementions of shuffle manager, i.e. SORT and 
> HASH. Both of them will use disk in some stages. For examples, in the map 
> side, all the intermediate data will be written into temporary files. In the 
> reduce side, Spark will use external sort sometimes. In any case, disk I/O 
> will bring some performance loss. Maybe,we can provide a pure-memory shuffle 
> manager. In this shuffle manager, intermediate data will only go through 
> memory. In some of scenes, it can improve performance. Experimentally, I 
> implemented a in-memory shuffle manager upon SPARK-2044. 
> 1. Following is my testing result (some heary shuffle operations):
> | data size (Byte)       |  partitions  |  resources |
> | 5131859218  |    2000       |   50 executors/ 4 cores/ 4GB |
> | settings               |  operation1                                   | 
> operation2 |
> | shuffle spill & lz4 |  repartition+flatMap+groupByKey | repartition + 
> groupByKey | 
> |memory   |   38s                   |  16s |
> |sort     |   45s                   |  28s |
> |hash     |   46s                   |  28s |
> |no shuffle spill & lz4 | | |
> | memory |   16s                         | 16s |
> | | | |
> |shuffle spill & lzf | | |
> |memory|  28s                           | 27s |
> |sort  |  29s                           | 29s |
> |hash  |  41s                           | 30s |
> |no shuffle spill & lzf | | |
> | memory |  15s                         | 16s |
> In my implementation, I simply reused the "BlockManager" in the map-side and 
> set the "spark.shuffle.spill" false in the reduce-side. All the intermediate 
> data is cached in memory store. Just as Reynold Xin has pointed out, our 
> disk-based shuffle manager has achieved a good performance. With  parameter 
> tuning, the disk-based shuffle manager will  obtain similar performance as 
> memory-based shuffle manager. However, I will continue my work and improve 
> it. And as an alternative tuning option, "InMemory shuffle" is a good choice. 
> Future work includes, but is not limited to:
> - memory usage management in "InMemory Shuffle" mode
> - data management when intermediate data can not fit in memory
> Test code:
> {code: borderStyle=solid}
>     val conf = new SparkConf().setAppName("InMemoryShuffleTest")
>     val sc = new SparkContext(conf)
>     val dataPath = args(0)
>     val partitions = args(1).toInt
>     val rdd1 = sc.textFile(dataPath).cache()
>     rdd1.count()
>     val startTime = System.currentTimeMillis()
>     val rdd2 = rdd1.repartition(partitions)
>               .flatMap(_.split(",")).map(s => (s, s))
>               .groupBy(e => e._1)
>     rdd2.count()
>     val endTime = System.currentTimeMillis()
>     println("time: " + (endTime - startTime) / 1000 )
> {code}
> 2. Following is a Spark Sort Benchmark (in spark 1.1.1). There is no tuning 
> for disk shuffle. 
> 2.1. Test the influence of memory size per core    
> precondition: 100GB(SORT benchmark), 100 executor /15cores  1491partitions 
> (input file blocks) . 
> | memory size per executor| inmemory shuffle(no shuffle spill)  |  sort 
> shuffle  |  hash shuffle |   improvement(vs.sort)  |   improvement(vs.hash) |
> |    9GB       |  79.652849s     |  60.102337s     |     failed    |       
> -32.7%            |          -            |
> |    12GB      |  54.821924s     |  51.654897s     |    109.167068s |       
> -3.17%            |        +47.8%         | 
> |    15GB      |  33.537199s     |  40.140621s     |    48.088158s  |       
> +16.47%           |        +30.26%        |
> |    18GB      |  30.930927s     |  43.392401s     |    49.830276s  |       
> +28.7%            |        +37.93%        | 
> 2.2. Test the influence of partition number
> 18GB/15cores per executor
> | partitions | inmemory shuffle(no shuffle spill)  |  sort shuffle  |  hash 
> shuffle |   improvement(vs.sort)  |  improvement(vs.hash) |        
> |    1000      |  92.675436s     |  85.193158s     |    71.106323s  |       
> -8.78%            |        -30.34%        |             
> |    1491      |  30.930927s     |  43.392401s     |    49.830276s  |       
> +28.7%            |        +37.93%        |
> |    2000      |  18.385s        |  26.653720s     |    30.103s     |       
> +31.02%           |        +38.92%        |



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to