[ https://issues.apache.org/jira/browse/BEAM-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064831#comment-17064831 ]
Maximilian Michels commented on BEAM-9566: ------------------------------------------ The mentioned regression code path in the Flink Runner can only affect streaming. For batch, this does not apply because timers are processed after all data is read. I'll open a PR to fix this. > Performance regression of FlinkRunner stream mode due to watermark holds > update > ------------------------------------------------------------------------------- > > Key: BEAM-9566 > URL: https://issues.apache.org/jira/browse/BEAM-9566 > Project: Beam > Issue Type: Bug > Components: runner-flink, testing-nexmark > Affects Versions: 2.20.0 > Reporter: Bingfeng Xia > Assignee: Maximilian Michels > Priority: Blocker > Fix For: 2.20.0 > > > Nexmark tests show that the throughput of FlinkRunner with Rocksdb state > backend dropped by 50%~80% in Query 4/5/6/9/11. Some other queries also > dropped but not as much as these queries. Affected queries contain Keyed > State. > > > Nexmark tests results. Tests have been run on the same machine and can be > reproduced. > - before regression: > {code:java} > Performance: > Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results > (Baseline) > 0000 3.8 26171.2 100000 > 0001 3.9 25967.3 92000 > 0002 1.9 53447.4 351 > 0003 2.8 35791.0 580 > 0004 2.5 4045.3 40 > 0005 9.6 10448.2 12 > 0006 1.2 8532.4 401 > 0007 4.0 25018.8 1 > 0008 2.9 34928.4 6000 > 0009 1.1 9066.2 298 > 0010 9.5 10564.1 2 > 0011 11.1 9005.0 1919 > 0012 4.5 22075.1 1919 > 0013 4.4 22547.9 92000 > 0014 9.7 10261.7 92000 > {code} > - after regression: > {code:java} > Performance: > Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results > (Baseline) > 0000 4.5 22036.1 100000 > 0001 3.9 25839.8 92000 > 0002 2.3 43763.7 351 > 0003 3.5 28669.7 580 > 0004 3.6 2801.1 40 > 0005 22.6 4429.1 12 > 0006 2.5 3993.6 401 > 0007 7.5 13320.9 1 > 0008 2.7 36737.7 6000 > 0009 2.5 3930.8 298 > 0010 16.2 6178.6 2 > 0011 82.9 1206.3 1919 > 0012 5.9 16874.8 1919 > 0013 4.2 23889.2 92000 > {code} > > The regression comes from the "updateWatermarkHold()" function recently added > in Flink DoFnOperator in > [PR#10534|h[https://github.com/apache/beam/pull/10534]] > [https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140] > It's to allow FinkRunner to set watermark holds. However, the implementation > was not performance efficient: > # "pendingTimersById" is an interface of the RocksDB key/value map state. > Iterate all values via "pendingTimersById.values()" is a big cost. According > to the bellow CPU cycles profiling result of Query 4 by > [async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we > also can see that most of the CPU time was spent on "RocksIterator.next()" > brought by "pendingTimersById.values()". > # Another overhead is that this function will be called multiple times in > each setTimer and deleteTimer; > > CPU time profiling result (FlameGraph) of Query 4 in Nexmark: > [https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh] > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)