[
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
uncleGen updated SPARK-3376:
----------------------------
Description:
I think a memory-based shuffle can reduce some overhead of disk I/O. I just
want to know is there any plan to do something about it. Or any suggestion
about it. Base on the work (SPARK-2044), it is feasible to have several
implementations of shuffle.
Currently, there are two implementions of shuffle manager, i.e. SORT and HASH.
Both of them will use disk in some stages. For examples, in the map side, all
the intermediate data will be written into temporary files. In the reduce side,
Spark will use external sort sometimes. In any case, disk I/O will bring some
performance loss. Maybe,we can provide a pure-memory shuffle manager. In this
shuffle manager, intermediate data will only go through memory. In some of
scenes, it can improve performance. Experimentally, I implemented a in-memory
shuffle manager upon
[SPARK-2044](https://issues.apache.org/jira/browse/SPARK-2044). Following is my
testing result:
| data size | partitions | resources |
| 5131859218 | 2000 | 50 executors/ 4 cores/ 4GB |
| settings | operation1 |
operation2 |
| shuffle spill & lz4 | repartition+flatMap+groupByKey | repartition +
groupByKey |
|memory | 38s | 16s |
|sort | 45s | 28s |
|hash | 46s | 28s |
|no shuffle spill & lz4 | | |
| memory | 16s | 16s |
| | | |
|shuffle spill & lzf | | |
|memory| 28s | 27s |
|sort | 29s | 29s |
|hash | 41s | 30s |
|no shuffle spill & lzf | | |
| memory | 15s | 16s |
In my implementation, I simply reused the "BlockManager" in the map-side and
set the "spark.shuffle.spill" false in the reduce-side. All the intermediate
data is cached in memory store. Future work include but not only:
- memory usage management in "InMemory Shuffle" mode
- data management when intermediate data can not fit in memory
Test code:
```
val conf = new SparkConf().setAppName("InMemoryShuffleTest")
val sc = new SparkContext(conf)
val dataPath = args(0)
val partitions = args(1).toInt
val rdd1 = sc.textFile(dataPath).cache()
rdd1.count()
val startTime = System.currentTimeMillis()
val rdd2 = rdd1.repartition(partitions)
.flatMap(_.split(",")).map(s => (s, s))
.groupBy(e => e._1)
rdd2.count()
val endTime = System.currentTimeMillis()
println("time: " + (endTime - startTime) / 1000 )
```
was:
I think a memory-based shuffle can reduce some overhead of disk I/O. I just
want to know is there any plan to do something about it. Or any suggestion
about it. Base on the work (SPARK-2044), it is feasible to have several
implementations of shuffle.
Currently, there are two implementions of shuffle manager, i.e. SORT and HASH.
Both of them will use disk in some stages. For examples, in the map side, all
the intermediate data will be written into temporary files. In the reduce side,
Spark will use external sort sometimes. In any case, disk I/O will bring some
performance loss. Maybe,we can provide a pure-memory shuffle manager. In this
shuffle manager, intermediate data will only go through memory. In some of
scenes, it can improve performance. Experimentally, I implemented a in-memory
shuffle manager upon
[SPARK-2044](https://issues.apache.org/jira/browse/SPARK-2044). Following is my
testing result:
| data size | partitions | resources |
| 5131859218 | 2000 | 50 executors/ 4 cores/ 4GB |
| settings | operation1 |
operation2 |
| shuffle spill & lz4 | repartition+flatMap+groupByKey | repartition +
groupByKey |
|memory | 38s | 16s |
|sort | 45s | 28s |
|hash | 46s | 28s |
|no shuffle spill & lz4 | | |
| memory | 16s | 16s |
| | | |
|shuffle spill & lzf | | |
|memory| 28s | 27s |
|sort | 29s | 29s |
|hash | 41s | 30s |
|no shuffle spill & lzf | | |
| memory | 15s | 16s |
In my implementation, I simply reused the "BlockManager" in the map-side and
set the "spark.shuffle.spill" false in the reduce-side. All the intermediate
data is cached in memory store. Future work include and not only:
- memory usage management in "InMemory Shuffle" mode
- data management when intermediate data can not fit in memory
> Memory-based shuffle strategy to reduce overhead of disk I/O
> ------------------------------------------------------------
>
> Key: SPARK-3376
> URL: https://issues.apache.org/jira/browse/SPARK-3376
> Project: Spark
> Issue Type: Planned Work
> Reporter: uncleGen
> Priority: Trivial
>
> I think a memory-based shuffle can reduce some overhead of disk I/O. I just
> want to know is there any plan to do something about it. Or any suggestion
> about it. Base on the work (SPARK-2044), it is feasible to have several
> implementations of shuffle.
> Currently, there are two implementions of shuffle manager, i.e. SORT and
> HASH. Both of them will use disk in some stages. For examples, in the map
> side, all the intermediate data will be written into temporary files. In the
> reduce side, Spark will use external sort sometimes. In any case, disk I/O
> will bring some performance loss. Maybe,we can provide a pure-memory shuffle
> manager. In this shuffle manager, intermediate data will only go through
> memory. In some of scenes, it can improve performance. Experimentally, I
> implemented a in-memory shuffle manager upon
> [SPARK-2044](https://issues.apache.org/jira/browse/SPARK-2044). Following is
> my testing result:
> | data size | partitions | resources |
> | 5131859218 | 2000 | 50 executors/ 4 cores/ 4GB |
> | settings | operation1 |
> operation2 |
> | shuffle spill & lz4 | repartition+flatMap+groupByKey | repartition +
> groupByKey |
> |memory | 38s | 16s |
> |sort | 45s | 28s |
> |hash | 46s | 28s |
> |no shuffle spill & lz4 | | |
> | memory | 16s | 16s |
> | | | |
> |shuffle spill & lzf | | |
> |memory| 28s | 27s |
> |sort | 29s | 29s |
> |hash | 41s | 30s |
> |no shuffle spill & lzf | | |
> | memory | 15s | 16s |
> In my implementation, I simply reused the "BlockManager" in the map-side and
> set the "spark.shuffle.spill" false in the reduce-side. All the intermediate
> data is cached in memory store. Future work include but not only:
> - memory usage management in "InMemory Shuffle" mode
> - data management when intermediate data can not fit in memory
> Test code:
> ```
> val conf = new SparkConf().setAppName("InMemoryShuffleTest")
> val sc = new SparkContext(conf)
> val dataPath = args(0)
> val partitions = args(1).toInt
> val rdd1 = sc.textFile(dataPath).cache()
> rdd1.count()
> val startTime = System.currentTimeMillis()
> val rdd2 = rdd1.repartition(partitions)
> .flatMap(_.split(",")).map(s => (s, s))
> .groupBy(e => e._1)
> rdd2.count()
> val endTime = System.currentTimeMillis()
> println("time: " + (endTime - startTime) / 1000 )
> ```
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]