Bingfeng Xia created BEAM-9566:
----------------------------------

             Summary: Performance regression of FlinkRunner stream mode due to 
watermark holds update
                 Key: BEAM-9566
                 URL: https://issues.apache.org/jira/browse/BEAM-9566
             Project: Beam
          Issue Type: Bug
          Components: runner-flink, testing-nexmark
    Affects Versions: 2.20.0
            Reporter: Bingfeng Xia


Nexmark tests show that the throughput of FlinkRunner with Rocksdb state 
backend dropped by 50%~80% in Query 4/5/6/9/11.  Some other queries also 
dropped but not as much as these queries. Affected queries contain Keyed State.

 

 

Nexmark tests results. Tests have been run on the same machine and can be 
reproduced.

 - before regression:
{code:java}
Performance:
  Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results    
(Baseline)
  0000           3.8                     26171.2                      100000
  0001           3.9                     25967.3                       92000
  0002           1.9                     53447.4                         351
  0003           2.8                     35791.0                         580
  0004           2.5                      4045.3                          40
  0005           9.6                     10448.2                          12
  0006           1.2                      8532.4                         401
  0007           4.0                     25018.8                           1
  0008           2.9                     34928.4                        6000
  0009           1.1                      9066.2                         298
  0010           9.5                     10564.1                           2
  0011          11.1                      9005.0                        1919
  0012           4.5                     22075.1                        1919
  0013           4.4                     22547.9                       92000
  0014           9.7                     10261.7                       92000
{code}
 - after regression:
{code:java}
Performance:
  Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results    
(Baseline)
  0000           4.5                     22036.1                      100000
  0001           3.9                     25839.8                       92000
  0002           2.3                     43763.7                         351
  0003           3.5                     28669.7                         580
  0004           3.6                      2801.1                          40
  0005          22.6                      4429.1                          12
  0006           2.5                      3993.6                         401
  0007           7.5                     13320.9                           1
  0008           2.7                     36737.7                        6000
  0009           2.5                      3930.8                         298
  0010          16.2                      6178.6                           2
  0011          82.9                      1206.3                        1919
  0012           5.9                     16874.8                        1919
  0013           4.2                     23889.2                       92000
{code}
 

The regression comes from the "updateWatermarkHold()" function recently added 
in Flink DoFnOperator in [PR#10534|h[https://github.com/apache/beam/pull/10534]]

[https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140]

It's to allow FinkRunner to set watermark holds. However, the implementation 
was not performance efficient:
 # "pendingTimersById" is an interface of the RocksDB key/value map state. 
Iterate all values via "pendingTimersById.values()" is a big cost. According to 
the bellow CPU cycles profiling result of Query 4 by 
[async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we 
also can see that most of the CPU time was spent on "RocksIterator.next()" 
brought by "pendingTimersById.values()".
 # Another overhead is that this function will be called multiple times in each 
setTimer and deleteTimer;

 

CPU time profiling result (FlameGraph) of Query 4 in Nexmark:

[https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh]

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to