sunhaibotb commented on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#issuecomment-497261439 The code has been updated and rebased on the top of [PR#8467](https://github.com/apache/flink/pull/8467) and [PR#8476](https://github.com/apache/flink/pull/8476) . After talking to @pnowojski , the previous benchmarks were somewhat unreasonable. We should use a new high-level benchmark based on DataStream API. I wrote and ran the new benchmarks, and I came to the conclusion that these changes run as fast as the master branch (that is, the changes in [PR#8467](https://github.com/apache/flink/pull/8467) + [PR#8476](https://github.com/apache/flink/pull/8476) and this PR have no performance regression). The code and the results are as follows. - **Code of the `TwoInputBenchmark.twoInputMapSink` benchmark:** ``` env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); env.setParallelism(1); long numRecordsPerInput = RECORDS_PER_INVOCATION / 2; DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput)); DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput)); source1 .connect(source2) .transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoCoStreamMap()) .addSink(new DiscardingSink<>()); ``` - **Code of the `TwoInputBenchmark.twoInputSelectableMapSink ` benchmark:** ``` env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); env.setParallelism(1); long numRecordsPerInput = RECORDS_PER_INVOCATION / 2; DataStreamSource<Long> source1 = env.addSource(new LongSource(numRecordsPerInput)); DataStreamSource<Long> source2 = env.addSource(new LongSource(numRecordsPerInput)); source1 .connect(source2) .transform("custom operator", TypeInformation.of(Long.class), new MultiplyByTwoAndInputSelectableCoStreamMap()) .addSink(new DiscardingSink<>()); ``` - **Results of the branch: master** ``` Benchmark Mode Cnt Score Error Units TwoInputBenchmark.twoInputMapSink thrpt 30 9340.356 ± 701.229 ops/ms ``` - **Results of the branch: master + [PR#8467](https://github.com/apache/flink/pull/8467) + [PR#8476](https://github.com/apache/flink/pull/8476) + this PR** ``` Benchmark Mode Cnt Score Error Units TwoInputBenchmark.twoInputMapSink thrpt 30 9511.961 ± 596.835 ops/ms TwoInputBenchmark.twoInputSelectableMapSink thrpt 30 9556.939 ± 552.054 ops/ms ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
