[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

uncleGen updated SPARK-3376:
----------------------------
    Description: 
I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
want to know is there any plan to do something about it. Or any suggestion 
about it. Base on the work (SPARK-2044), it is feasible to have several 
implementations of  shuffle.

----------------------------------------------------------------------------------------------------------------------------------------------------------------

Currently, there are two implementions of shuffle manager, i.e. SORT and HASH. 
Both of them will use disk in some stages. For examples, in the map side, all 
the intermediate data will be written into temporary files. In the reduce side, 
Spark will use external sort sometimes. In any case, disk I/O will bring some 
performance loss. Maybe,we can provide a pure-memory shuffle manager. In this 
shuffle manager, intermediate data will only go through memory. In some of 
scenes, it can improve performance. Experimentally, I implemented a in-memory 
shuffle manager upon SPARK-2044. 

1. Following is my testing result (some heary shuffle operations):

| data size (Byte)       |  partitions  |  resources |
| 5131859218  |    2000       |   50 executors/ 4 cores/ 4GB |

| settings               |  operation1                                   | 
operation2 |
| shuffle spill & lz4 |  repartition+flatMap+groupByKey | repartition + 
groupByKey | 
|memory   |   38s                   |  16s |
|sort     |   45s                   |  28s |
|hash     |   46s                   |  28s |
|no shuffle spill & lz4 | | |
| memory |   16s                         | 16s |
| | | |
|shuffle spill & lzf | | |
|memory|  28s                           | 27s |
|sort  |  29s                           | 29s |
|hash  |  41s                           | 30s |
|no shuffle spill & lzf | | |
| memory |  15s                         | 16s |

In my implementation, I simply reused the "BlockManager" in the map-side and 
set the "spark.shuffle.spill" false in the reduce-side. All the intermediate 
data is cached in memory store. Just as Reynold Xin has pointed out, our 
disk-based shuffle manager has achieved a good performance. With  parameter 
tuning, the disk-based shuffle manager will  obtain similar performance as 
memory-based shuffle manager. However, I will continue my work and improve it. 
And as an alternative tuning option, "InMemory shuffle" is a good choice. 
Future work includes, but is not limited to:

- memory usage management in "InMemory Shuffle" mode
- data management when intermediate data can not fit in memory

Test code:

{code: borderStyle=solid}

    val conf = new SparkConf().setAppName("InMemoryShuffleTest")
    val sc = new SparkContext(conf)

    val dataPath = args(0)
    val partitions = args(1).toInt

    val rdd1 = sc.textFile(dataPath).cache()
    rdd1.count()
    val startTime = System.currentTimeMillis()
    val rdd2 = rdd1.repartition(partitions)
              .flatMap(_.split(",")).map(s => (s, s))
              .groupBy(e => e._1)
    rdd2.count()
    val endTime = System.currentTimeMillis()

    println("time: " + (endTime - startTime) / 1000 )

{code}

2. Following is a Spark Sort Benchmark:

2.1. Test the influence of memory size per core    

precondition: 100GB(SORT benchmark), 100 executor /15core  1491partitions 
(input file blocks) 

| memory size per executor| inmemory shuffle(no shuffle spill)  |  sort shuffle 
 |  hash shuffle |   improvement(vs.sort)  |   improvement(vs.hash) |
|    9GB       |  79.652849s     |  60.102337s     |     failed    |       
-32.7%            |          -            |
|    12GB      |  54.821924s     |  51.654897s     |    109.167068s |       
-3.17%            |        +47.8%         | 
|    15GB      |  33.537199s     |  40.140621s     |    48.088158s  |       
+16.47%           |        +30.26%        |
|    18GB      |  30.930927s     |  43.392401s     |    49.830276s  |       
+28.7%            |        +37.93%        | 

2.2. Test the influence of partition numer

18GB/15core per executor

| partitions | inmemory shuffle(no shuffle spill)  |  sort shuffle  |  hash 
shuffle |   improvement(vs.sort)  |  improvement(vs.hash) |        
|    1000      |  92.675436s     |  85.193158s     |    71.106323s  |       
-8.78%            |        -30.34%        |             
|    1491      |  30.930927s     |  43.392401s     |    49.830276s  |       
+28.7%            |        +37.93%        |
|    2000      |  18.385s        |  26.653720s     |    30.103s     |       
+31.02%           |        +38.92%        |


  was:
I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
want to know is there any plan to do something about it. Or any suggestion 
about it. Base on the work (SPARK-2044), it is feasible to have several 
implementations of  shuffle.

----------------------------------------------------------------------------------------------------------------------------------------------------------------

Currently, there are two implementions of shuffle manager, i.e. SORT and HASH. 
Both of them will use disk in some stages. For examples, in the map side, all 
the intermediate data will be written into temporary files. In the reduce side, 
Spark will use external sort sometimes. In any case, disk I/O will bring some 
performance loss. Maybe,we can provide a pure-memory shuffle manager. In this 
shuffle manager, intermediate data will only go through memory. In some of 
scenes, it can improve performance. Experimentally, I implemented a in-memory 
shuffle manager upon SPARK-2044. Following is my testing result:

| data size (Byte)       |  partitions  |  resources |
| 5131859218  |    2000       |   50 executors/ 4 cores/ 4GB |

| settings               |  operation1                                   | 
operation2 |
| shuffle spill & lz4 |  repartition+flatMap+groupByKey | repartition + 
groupByKey | 
|memory   |   38s                   |  16s |
|sort     |   45s                   |  28s |
|hash     |   46s                   |  28s |
|no shuffle spill & lz4 | | |
| memory |   16s                         | 16s |
| | | |
|shuffle spill & lzf | | |
|memory|  28s                           | 27s |
|sort  |  29s                           | 29s |
|hash  |  41s                           | 30s |
|no shuffle spill & lzf | | |
| memory |  15s                         | 16s |

In my implementation, I simply reused the "BlockManager" in the map-side and 
set the "spark.shuffle.spill" false in the reduce-side. All the intermediate 
data is cached in memory store. Just as Reynold Xin has pointed out, our 
disk-based shuffle manager has achieved a good performance. With  parameter 
tuning, the disk-based shuffle manager will  obtain similar performance as 
memory-based shuffle manager. However, I will continue my work and improve it. 
And as an alternative tuning option, "InMemory shuffle" is a good choice. 
Future work includes, but is not limited to:

- memory usage management in "InMemory Shuffle" mode
- data management when intermediate data can not fit in memory

Test code:

{code: borderStyle=solid}

    val conf = new SparkConf().setAppName("InMemoryShuffleTest")
    val sc = new SparkContext(conf)

    val dataPath = args(0)
    val partitions = args(1).toInt

    val rdd1 = sc.textFile(dataPath).cache()
    rdd1.count()
    val startTime = System.currentTimeMillis()
    val rdd2 = rdd1.repartition(partitions)
              .flatMap(_.split(",")).map(s => (s, s))
              .groupBy(e => e._1)
    rdd2.count()
    val endTime = System.currentTimeMillis()

    println("time: " + (endTime - startTime) / 1000 )

{code}


Spark Sort Benchmark

Test the influence of memory size per core    

> 100GB(SORT benchmark)                   
> 100 executor /15core  1491partitions (input file blocks) 

| memory size per executor| inmemory shuffle(no shuffle spill)  |  sort shuffle 
 |  hash shuffle |   improvement(vs.sort)  |   improvement(vs.hash) |
|    9GB       |  79.652849s     |  60.102337s     |     failed    |       
-32.7%            |          -            |
|    12GB      |  54.821924s     |  51.654897s     |    109.167068s |       
-3.17%            |        +47.8%         | 
|    15GB      |  33.537199s     |  40.140621s     |    48.088158s  |       
+16.47%           |        +30.26%        |
|    18GB      |  30.930927s     |  43.392401s     |    49.830276s  |       
+28.7%            |        +37.93%        | 

Test the influence of partition numer

18GB/15core per executor

| partitions | inmemory shuffle(no shuffle spill)  |  sort shuffle  |  hash 
shuffle |   improvement(vs.sort)  |  improvement(vs.hash) |        
|    1000      |  92.675436s     |  85.193158s     |    71.106323s  |       
-8.78%            |        -30.34%        |             
|    1491      |  30.930927s     |  43.392401s     |    49.830276s  |       
+28.7%            |        +37.93%        |
|    2000      |  18.385s        |  26.653720s     |    30.103s     |       
+31.02%           |        +38.92%        |



> Memory-based shuffle strategy to reduce overhead of disk I/O
> ------------------------------------------------------------
>
>                 Key: SPARK-3376
>                 URL: https://issues.apache.org/jira/browse/SPARK-3376
>             Project: Spark
>          Issue Type: New Feature
>          Components: Shuffle
>    Affects Versions: 1.1.0
>            Reporter: uncleGen
>              Labels: performance
>
> I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
> want to know is there any plan to do something about it. Or any suggestion 
> about it. Base on the work (SPARK-2044), it is feasible to have several 
> implementations of  shuffle.
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------
> Currently, there are two implementions of shuffle manager, i.e. SORT and 
> HASH. Both of them will use disk in some stages. For examples, in the map 
> side, all the intermediate data will be written into temporary files. In the 
> reduce side, Spark will use external sort sometimes. In any case, disk I/O 
> will bring some performance loss. Maybe,we can provide a pure-memory shuffle 
> manager. In this shuffle manager, intermediate data will only go through 
> memory. In some of scenes, it can improve performance. Experimentally, I 
> implemented a in-memory shuffle manager upon SPARK-2044. 
> 1. Following is my testing result (some heary shuffle operations):
> | data size (Byte)       |  partitions  |  resources |
> | 5131859218  |    2000       |   50 executors/ 4 cores/ 4GB |
> | settings               |  operation1                                   | 
> operation2 |
> | shuffle spill & lz4 |  repartition+flatMap+groupByKey | repartition + 
> groupByKey | 
> |memory   |   38s                   |  16s |
> |sort     |   45s                   |  28s |
> |hash     |   46s                   |  28s |
> |no shuffle spill & lz4 | | |
> | memory |   16s                         | 16s |
> | | | |
> |shuffle spill & lzf | | |
> |memory|  28s                           | 27s |
> |sort  |  29s                           | 29s |
> |hash  |  41s                           | 30s |
> |no shuffle spill & lzf | | |
> | memory |  15s                         | 16s |
> In my implementation, I simply reused the "BlockManager" in the map-side and 
> set the "spark.shuffle.spill" false in the reduce-side. All the intermediate 
> data is cached in memory store. Just as Reynold Xin has pointed out, our 
> disk-based shuffle manager has achieved a good performance. With  parameter 
> tuning, the disk-based shuffle manager will  obtain similar performance as 
> memory-based shuffle manager. However, I will continue my work and improve 
> it. And as an alternative tuning option, "InMemory shuffle" is a good choice. 
> Future work includes, but is not limited to:
> - memory usage management in "InMemory Shuffle" mode
> - data management when intermediate data can not fit in memory
> Test code:
> {code: borderStyle=solid}
>     val conf = new SparkConf().setAppName("InMemoryShuffleTest")
>     val sc = new SparkContext(conf)
>     val dataPath = args(0)
>     val partitions = args(1).toInt
>     val rdd1 = sc.textFile(dataPath).cache()
>     rdd1.count()
>     val startTime = System.currentTimeMillis()
>     val rdd2 = rdd1.repartition(partitions)
>               .flatMap(_.split(",")).map(s => (s, s))
>               .groupBy(e => e._1)
>     rdd2.count()
>     val endTime = System.currentTimeMillis()
>     println("time: " + (endTime - startTime) / 1000 )
> {code}
> 2. Following is a Spark Sort Benchmark:
> 2.1. Test the influence of memory size per core    
> precondition: 100GB(SORT benchmark), 100 executor /15core  1491partitions 
> (input file blocks) 
> | memory size per executor| inmemory shuffle(no shuffle spill)  |  sort 
> shuffle  |  hash shuffle |   improvement(vs.sort)  |   improvement(vs.hash) |
> |    9GB       |  79.652849s     |  60.102337s     |     failed    |       
> -32.7%            |          -            |
> |    12GB      |  54.821924s     |  51.654897s     |    109.167068s |       
> -3.17%            |        +47.8%         | 
> |    15GB      |  33.537199s     |  40.140621s     |    48.088158s  |       
> +16.47%           |        +30.26%        |
> |    18GB      |  30.930927s     |  43.392401s     |    49.830276s  |       
> +28.7%            |        +37.93%        | 
> 2.2. Test the influence of partition numer
> 18GB/15core per executor
> | partitions | inmemory shuffle(no shuffle spill)  |  sort shuffle  |  hash 
> shuffle |   improvement(vs.sort)  |  improvement(vs.hash) |        
> |    1000      |  92.675436s     |  85.193158s     |    71.106323s  |       
> -8.78%            |        -30.34%        |             
> |    1491      |  30.930927s     |  43.392401s     |    49.830276s  |       
> +28.7%            |        +37.93%        |
> |    2000      |  18.385s        |  26.653720s     |    30.103s     |       
> +31.02%           |        +38.92%        |



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to