[ 
https://issues.apache.org/jira/browse/BEAM-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064877#comment-17064877
 ] 

Maximilian Michels commented on BEAM-9566:
------------------------------------------

Very likely, Flink 1.10 is a batch-centered release: 
https://flink.apache.org/news/2020/02/11/release-1.10.0.html

> Performance regression of FlinkRunner stream mode due to watermark holds 
> update
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-9566
>                 URL: https://issues.apache.org/jira/browse/BEAM-9566
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, testing-nexmark
>    Affects Versions: 2.20.0
>            Reporter: Bingfeng Xia
>            Assignee: Maximilian Michels
>            Priority: Blocker
>             Fix For: 2.20.0
>
>
> Nexmark tests show that the throughput of FlinkRunner with Rocksdb state 
> backend dropped by 50%~80% in Query 4/5/6/9/11.  Some other queries also 
> dropped but not as much as these queries. Affected queries contain Keyed 
> State.
>  
>  
> Nexmark tests results. Tests have been run on the same machine and can be 
> reproduced.
>  - before regression:
> {code:java}
> Performance:
>   Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results  
>   (Baseline)
>   0000           3.8                     26171.2                      100000
>   0001           3.9                     25967.3                       92000
>   0002           1.9                     53447.4                         351
>   0003           2.8                     35791.0                         580
>   0004           2.5                      4045.3                          40
>   0005           9.6                     10448.2                          12
>   0006           1.2                      8532.4                         401
>   0007           4.0                     25018.8                           1
>   0008           2.9                     34928.4                        6000
>   0009           1.1                      9066.2                         298
>   0010           9.5                     10564.1                           2
>   0011          11.1                      9005.0                        1919
>   0012           4.5                     22075.1                        1919
>   0013           4.4                     22547.9                       92000
>   0014           9.7                     10261.7                       92000
> {code}
>  - after regression:
> {code:java}
> Performance:
>   Conf  Runtime(sec)    (Baseline)  Events(/sec)    (Baseline)       Results  
>   (Baseline)
>   0000           4.5                     22036.1                      100000
>   0001           3.9                     25839.8                       92000
>   0002           2.3                     43763.7                         351
>   0003           3.5                     28669.7                         580
>   0004           3.6                      2801.1                          40
>   0005          22.6                      4429.1                          12
>   0006           2.5                      3993.6                         401
>   0007           7.5                     13320.9                           1
>   0008           2.7                     36737.7                        6000
>   0009           2.5                      3930.8                         298
>   0010          16.2                      6178.6                           2
>   0011          82.9                      1206.3                        1919
>   0012           5.9                     16874.8                        1919
>   0013           4.2                     23889.2                       92000
> {code}
>  
> The regression comes from the "updateWatermarkHold()" function recently added 
> in Flink DoFnOperator in 
> [PR#10534|h[https://github.com/apache/beam/pull/10534]]
> [https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140]
> It's to allow FinkRunner to set watermark holds. However, the implementation 
> was not performance efficient:
>  # "pendingTimersById" is an interface of the RocksDB key/value map state. 
> Iterate all values via "pendingTimersById.values()" is a big cost. According 
> to the bellow CPU cycles profiling result of Query 4 by 
> [async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we 
> also can see that most of the CPU time was spent on "RocksIterator.next()" 
> brought by "pendingTimersById.values()".
>  # Another overhead is that this function will be called multiple times in 
> each setTimer and deleteTimer;
>  
> CPU time profiling result (FlameGraph) of Query 4 in Nexmark:
> [https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to