wsry opened a new pull request #13222:
URL: https://github.com/apache/flink/pull/13222


   ## What is the purpose of the change
   
   This PR is a draft for convenience of obtaining the main code diff of the 
ResultPartitionWriter interface changes.
    
   The main purpose of the ResultPartitionWriter interface improvement is for 
sort-merge based batch shuffle. To make it possible to implement the sort-merge 
based batch shuffle, after discussion, we decide to add record-oriented 
interfaces to ResultPartitionWriter and remove the old buffer-oriented 
interfaces. However, this change introduce additional per-record overhead and 
result in 5% -10% performance regression for some network benchmarks. We 
further do some optimizations: 
   1)
   2)
   3)
   These optimizations compensate the regression and the followings are the 
benchmark results:
   Before this change:
   ```
   Benchmark                                                                    
 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
   DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput     
                    N/A  thrpt   30  20508.293 ±  467.932  ops/ms
   StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput 
                    N/A  thrpt   30    707.308 ±    7.070  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
              100,100ms  thrpt   30  48370.571 ± 1300.206  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
          100,100ms,SSL  thrpt   30  11182.832 ±  249.167  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
               1000,1ms  thrpt   30  28303.697 ±  731.245  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
             1000,100ms  thrpt   30  32439.635 ± 4025.664  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
         1000,100ms,SSL  thrpt   30   9127.954 ±  178.195  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
     1000,100ms,OpenSSL  thrpt   30  23218.673 ±  529.881  ops/ms
   ```
   After this change:
   ```
   Benchmark                                                                    
 (channelsFlushTimeout)   Mode  Cnt      Score      Error   Units
   DataSkewStreamNetworkThroughputBenchmarkExecutor.networkSkewedThroughput     
                    N/A  thrpt   30  21241.746 ±  445.289  ops/ms
   StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput 
                    N/A  thrpt   30    714.766 ±   22.332  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
              100,100ms  thrpt   30  48641.043 ± 1522.510  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
          100,100ms,SSL  thrpt   30  11093.303 ±  166.213  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
               1000,1ms  thrpt   30  28392.504 ± 3361.873  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
             1000,100ms  thrpt   30  32617.439 ±  815.474  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
         1000,100ms,SSL  thrpt   30   9028.759 ±  138.101  ops/ms
   StreamNetworkThroughputBenchmarkExecutor.networkThroughput                   
     1000,100ms,OpenSSL  thrpt   30  24030.099 ±  788.459  ops/ms
   ```
   
   ## Brief change log
   
     - Change the interface of ResultPartitionWriter.
     - Do some refactor and optimization to improve the performance.
     - Refactor the corresponding test cases.
   
   ## Verifying this change
   
   TODO
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
     - The serializers: (**yes** / no / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / 
don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to