[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14240985#comment-14240985
 ] 

uncleGen commented on SPARK-3376:
---------------------------------

[~rxin] Yeah, I agree with you. We can improve the I/O(disk I/O and network 
I/O) performance from hardware resources and software resources. With limited 
hardware resources, we can provide a soft way to achieve a similar performance. 
Maybe, it is a good choice to provide an alternative “memory-based” shuffle 
option.

> Memory-based shuffle strategy to reduce overhead of disk I/O
> ------------------------------------------------------------
>
>                 Key: SPARK-3376
>                 URL: https://issues.apache.org/jira/browse/SPARK-3376
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle
>    Affects Versions: 1.1.0
>            Reporter: uncleGen
>            Priority: Trivial
>              Labels: performance
>
> I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
> want to know is there any plan to do something about it. Or any suggestion 
> about it. Base on the work (SPARK-2044), it is feasible to have several 
> implementations of  shuffle.
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
> Currently, there are two implementions of shuffle manager, i.e. SORT and 
> HASH. Both of them will use disk in some stages. For examples, in the map 
> side, all the intermediate data will be written into temporary files. In the 
> reduce side, Spark will use external sort sometimes. In any case, disk I/O 
> will bring some performance loss. Maybe,we can provide a pure-memory shuffle 
> manager. In this shuffle manager, intermediate data will only go through 
> memory. In some of scenes, it can improve performance. Experimentally, I 
> implemented a in-memory shuffle manager upon SPARK-2044. Following is my 
> testing result:
> | data size (Byte)       |  partitions  |  resources |
> | 5131859218  |    2000       |   50 executors/ 4 cores/ 4GB |
> | settings               |  operation1                                   | 
> operation2 |
> | shuffle spill & lz4 |  repartition+flatMap+groupByKey | repartition + 
> groupByKey | 
> |memory   |   38s                   |  16s |
> |sort     |   45s                   |  28s |
> |hash     |   46s                   |  28s |
> |no shuffle spill & lz4 | | |
> | memory |   16s                         | 16s |
> | | | |
> |shuffle spill & lzf | | |
> |memory|  28s                           | 27s |
> |sort  |  29s                           | 29s |
> |hash  |  41s                           | 30s |
> |no shuffle spill & lzf | | |
> | memory |  15s                         | 16s |
> In my implementation, I simply reused the "BlockManager" in the map-side and 
> set the "spark.shuffle.spill" false in the reduce-side. All the intermediate 
> data is cached in memory store. Just as Reynold Xin has pointed out, our 
> disk-based shuffle manager has achieved a good performance. With  parameter 
> tuning, the disk-based shuffle manager will  obtain similar performance as 
> memory-based shuffle manager. However, I will continue my work and improve 
> it. And as an alternative tuning option, "InMemory shuffle" is a good choice. 
> Future work includes, but is not limited to:
> - memory usage management in "InMemory Shuffle" mode
> - data management when intermediate data can not fit in memory
> Test code:
> {code: borderStyle=solid}
>     val conf = new SparkConf().setAppName("InMemoryShuffleTest")
>     val sc = new SparkContext(conf)
>     val dataPath = args(0)
>     val partitions = args(1).toInt
>     val rdd1 = sc.textFile(dataPath).cache()
>     rdd1.count()
>     val startTime = System.currentTimeMillis()
>     val rdd2 = rdd1.repartition(partitions)
>               .flatMap(_.split(",")).map(s => (s, s))
>               .groupBy(e => e._1)
>     rdd2.count()
>     val endTime = System.currentTimeMillis()
>     println("time: " + (endTime - startTime) / 1000 )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to