wsry commented on a change in pull request #9706: [FLINK-14118][runtime]Reduce 
the unnecessary flushing when there is no data available for flush.
URL: https://github.com/apache/flink/pull/9706#discussion_r328490790
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 ##########
 @@ -163,6 +170,10 @@ public int getCached() {
                        return PositionMarker.getAbsolute(cachedPosition);
                }
 
+               private int getLatest() {
+                       return PositionMarker.getAbsolute(positionMarker.get());
 
 Review comment:
   I ran some benchmark cases with 1ms bufferTimeout and no evident performance 
difference was seen before and after the fix. The followings are the result.
   
   Before the fix:
   ```
   Benchmark          (channelsFlushTimeout)  Mode   Cnt      Score     Error   
 Units
   NetworkThroughput         1000,1ms         thrpt   30    23032.384 ± 871.883 
 ops/ms
   KeyByBenchmarks.arrayKeyBy1MS              thrpt   30    1923.863   ± 78.518 
 ops/ms
   KeyByBenchmarks.tupleKeyBy1MS              thrpt   30    3377.401  ± 216.982 
 ops/ms
   MapRebalanceMapSink1MS                     thrpt   30    6091.213  ±  92.658 
 ops/ms
   MapSinkBufferTimeout1MS                    thrpt   30    9107.194  ± 211.169 
 ops/ms
   ```
   After the fix:
   ```
   Benchmark          (channelsFlushTimeout) Mode    Cnt     Score     Error    
Units
   NetworkThroughput         1000,1ms        thrpt   30    23985.588 ± 990.037  
ops/ms
   KeyByBenchmarks.arrayKeyBy1MS             thrpt   30     2011.356  ± 40.347  
ops/ms
   KeyByBenchmarks.tupleKeyBy1MS             thrpt   30     3440.238 ± 211.906  
ops/ms
   MapRebalanceMapSink1MS                    thrpt   30     6118.888  ± 94.517  
ops/ms
   MapSinkBufferTimeout1MS                   thrpt   30     9120.144 ± 252.023  
ops/ms
   ```
   
   As discussed in the mailing list. High data skew between large number of 
channels (subpartitions) with low bufferTimeout can lead to throughput 
regression. The results are as follows.
   
   Before the fix:
   ```
   Benchmark    (channelsFlushTimeout) Mode     Cnt     Score      Error   Units
   NetworkThroughput     1000,1ms      thrpt    30   18240.197 ± 1892.419 ops/ms
   ```
   
   After the fix:
   ```
   Benchmark    (channelsFlushTimeout) Mode     Cnt     Score      Error  Units
   NetworkThroughput     1000,1ms      thrpt    30   24532.313 ± 1118.312 ops/ms
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to