Bingfeng Xia created BEAM-9566:
----------------------------------
Summary: Performance regression of FlinkRunner stream mode due to
watermark holds update
Key: BEAM-9566
URL: https://issues.apache.org/jira/browse/BEAM-9566
Project: Beam
Issue Type: Bug
Components: runner-flink, testing-nexmark
Affects Versions: 2.20.0
Reporter: Bingfeng Xia
Nexmark tests show that the throughput of FlinkRunner with Rocksdb state
backend dropped by 50%~80% in Query 4/5/6/9/11. Some other queries also
dropped but not as much as these queries. Affected queries contain Keyed State.
Nexmark tests results. Tests have been run on the same machine and can be
reproduced.
- before regression:
{code:java}
Performance:
Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
(Baseline)
0000 3.8 26171.2 100000
0001 3.9 25967.3 92000
0002 1.9 53447.4 351
0003 2.8 35791.0 580
0004 2.5 4045.3 40
0005 9.6 10448.2 12
0006 1.2 8532.4 401
0007 4.0 25018.8 1
0008 2.9 34928.4 6000
0009 1.1 9066.2 298
0010 9.5 10564.1 2
0011 11.1 9005.0 1919
0012 4.5 22075.1 1919
0013 4.4 22547.9 92000
0014 9.7 10261.7 92000
{code}
- after regression:
{code:java}
Performance:
Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
(Baseline)
0000 4.5 22036.1 100000
0001 3.9 25839.8 92000
0002 2.3 43763.7 351
0003 3.5 28669.7 580
0004 3.6 2801.1 40
0005 22.6 4429.1 12
0006 2.5 3993.6 401
0007 7.5 13320.9 1
0008 2.7 36737.7 6000
0009 2.5 3930.8 298
0010 16.2 6178.6 2
0011 82.9 1206.3 1919
0012 5.9 16874.8 1919
0013 4.2 23889.2 92000
{code}
The regression comes from the "updateWatermarkHold()" function recently added
in Flink DoFnOperator in [PR#10534|h[https://github.com/apache/beam/pull/10534]]
[https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140]
It's to allow FinkRunner to set watermark holds. However, the implementation
was not performance efficient:
# "pendingTimersById" is an interface of the RocksDB key/value map state.
Iterate all values via "pendingTimersById.values()" is a big cost. According to
the bellow CPU cycles profiling result of Query 4 by
[async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we
also can see that most of the CPU time was spent on "RocksIterator.next()"
brought by "pendingTimersById.values()".
# Another overhead is that this function will be called multiple times in each
setTimer and deleteTimer;
CPU time profiling result (FlameGraph) of Query 4 in Nexmark:
[https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)