Zakelly commented on code in PR #21362:
URL: https://github.com/apache/flink/pull/21362#discussion_r1032098044


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/InternalKeyContextImpl.java:
##########
@@ -72,6 +73,10 @@ public void setCurrentKey(@Nonnull K currentKey) {
 
     @Override
     public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        if (!keyGroupRange.contains(currentKeyGroupIndex)) {
+            throw KeyGroupRangeOffsets.newIllegalKeyGroupException(
+                    currentKeyGroupIndex, keyGroupRange);
+        }

Review Comment:
   I have run several tests in flink-benchmarks. The result are as follows:
   
   - With this change:
   
   ```
   Benchmark                    Mode  Cnt     Score     Error   Units
   KeyByBenchmarks.arrayKeyBy  thrpt   30  4907.599 ± 412.976  ops/ms
   KeyByBenchmarks.tupleKeyBy  thrpt   30  8157.488 ± 911.389  ops/ms
   
   Benchmark                                  (stateBackend)   Mode  Cnt     
Score    Error   Units
   MemoryStateBackendBenchmark.stateBackends          MEMORY  thrpt  30  
4749.925 ± 35.146  ops/ms
   MemoryStateBackendBenchmark.stateBackends              FS  thrpt   30 
4815.565 ± 27.069  ops/ms
   MemoryStateBackendBenchmark.stateBackends        FS_ASYNC  thrpt  30  
4741.209 ± 46.514  ops/ms
   
   Benchmark                                 (stateBackend)   Mode  Cnt    
Score   Error   Units
   RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  
289.627 ± 1.684  ops/ms
   RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  
287.924 ± 4.893  ops/ms
   
   ```
   
   - Without this change:
   
   ```
   Benchmark                    Mode  Cnt     Score     Error   Units
   KeyByBenchmarks.arrayKeyBy  thrpt   30  5004.647 ± 379.077  ops/ms
   KeyByBenchmarks.tupleKeyBy  thrpt   30  8241.331 ± 870.276  ops/ms
   
   Benchmark                                  (stateBackend)   Mode  Cnt     
Score    Error   Units
   MemoryStateBackendBenchmark.stateBackends          MEMORY  thrpt   30  
4822.507 ± 65.687  ops/ms
   MemoryStateBackendBenchmark.stateBackends              FS  thrpt   30  
4748.527 ± 43.915  ops/ms
   MemoryStateBackendBenchmark.stateBackends        FS_ASYNC  thrpt   30  
4768.075 ± 41.401  ops/ms
   
   Benchmark                                 (stateBackend)   Mode  Cnt    
Score   Error   Units
   RocksStateBackendBenchmark.stateBackends           ROCKS  thrpt   30  
287.613 ± 2.971  ops/ms
   RocksStateBackendBenchmark.stateBackends       ROCKS_INC  thrpt   30  
288.627 ± 1.598  ops/ms
   ```
   
   As we can see, there is a slight impact (1%~2%) in pure keyby performance, 
but it is not obvious when involving state backends. I think it's acceptable, 
WDYT?
   
   And I'd rather not checking if the new KeyGroupIndex is the same as the 
current one, since the overhead is not significantly lower than current 
solution. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to