sunhaibotb commented on issue #8124: [FLINK-11877] Implement the runtime 
handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#issuecomment-497261439
 
 
   The code has been updated and rebased on the top of  
[PR#8467](https://github.com/apache/flink/pull/8467) and 
[PR#8476](https://github.com/apache/flink/pull/8476) . 
   
   After talking to @pnowojski , the previous benchmarks were somewhat 
unreasonable. We should use a new high-level benchmark based on DataStream API. 
 I wrote and ran the new benchmarks, and I came to the conclusion that these 
changes run as fast as the master branch (that is, the changes in  
[PR#8467](https://github.com/apache/flink/pull/8467) + 
[PR#8476](https://github.com/apache/flink/pull/8476) and this PR have no 
performance regression).  The code  and the results are as follows.
   
   - **Code of the `TwoInputBenchmark.twoInputMapSink` benchmark:**
   
   ```
   env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
   env.setParallelism(1);
   
   long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
   DataStreamSource<Long> source1 = env.addSource(new 
LongSource(numRecordsPerInput));
   DataStreamSource<Long> source2 = env.addSource(new 
LongSource(numRecordsPerInput));
   
   source1 
       .connect(source2)
       .transform("custom operator", TypeInformation.of(Long.class), new 
MultiplyByTwoCoStreamMap())
       .addSink(new DiscardingSink<>());
   ```
   
   - **Code of the `TwoInputBenchmark.twoInputSelectableMapSink  ` benchmark:**
   
   ```
   env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
   env.setParallelism(1);
   
   long numRecordsPerInput = RECORDS_PER_INVOCATION / 2;
   DataStreamSource<Long> source1 = env.addSource(new 
LongSource(numRecordsPerInput));
   DataStreamSource<Long> source2 = env.addSource(new 
LongSource(numRecordsPerInput));
   
   source1 
       .connect(source2)
       .transform("custom operator", TypeInformation.of(Long.class), new 
MultiplyByTwoAndInputSelectableCoStreamMap())
       .addSink(new DiscardingSink<>());
   
   ```
   
   - **Results of the branch: master**
   ```
   Benchmark                           Mode  Cnt     Score     Error   Units
   TwoInputBenchmark.twoInputMapSink  thrpt   30  9340.356 ± 701.229  ops/ms
   ```
   
   - **Results of the branch: master + 
[PR#8467](https://github.com/apache/flink/pull/8467) + 
[PR#8476](https://github.com/apache/flink/pull/8476) + this PR**
   
   ```
   Benchmark                                     Mode  Cnt     Score     Error  
 Units
   TwoInputBenchmark.twoInputMapSink            thrpt   30  9511.961 ± 596.835  
ops/ms
   TwoInputBenchmark.twoInputSelectableMapSink  thrpt   30  9556.939 ± 552.054  
ops/ms
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to