[
https://issues.apache.org/jira/browse/BEAM-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067356#comment-17067356
]
Bingfeng Xia commented on BEAM-9566:
------------------------------------
Thanks for the quick fix [~mxm].
I have patched your #11200 to master branch and re-run Nexmark to test Flink
Stream mode with RocksDB backend.
Sadly, I still got the bad throughput performance starting from Query3. And the
Query5 even could not finish in 5 minutes.
{code:java}
Performance:
Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
(Baseline)
0000 3.7 26795.3 100000
0001 3.5 28743.9 92000
0002 1.9 52938.1 351
0003 9.8 10204.1 580
0004 4.7 2129.0 40
0005 *** not run ***
0006 *** not run ***
0007 *** not run ***
0008 *** not run ***
0009 *** not run ***
0010 *** not run ***
0011 *** not run ***
0012 *** not run ***
0013 *** not run ***
0014 *** not run ***
==========================================================================================2020-03-26T04:33:57.399Z
Generating 100000 events in streaming mode
<============-> 99% EXECUTING [5m 39s]{code}
You also can test it by yourself. I have hard coded the Rocksdb state backend
by
{code:java}
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@VisibleForTesting
static StreamExecutionEnvironment createStreamExecutionEnvironment(
FlinkPipelineOptions options, List<String> filesToStage, @Nullable String
confDir) {
...
// hard coded RocksDB state backend
String path = "file:///tmp/flink-backend/checkpoints/.flink-test-" +
System.currentTimeMillis();
RocksDBStateBackend backend = new RocksDBStateBackend(new
FsStateBackend(path), true);
flinkStreamEnv.setStateBackend(backend);
return flinkStreamEnv;
}
{code}
> Performance regression of FlinkRunner stream mode due to watermark holds
> update
> -------------------------------------------------------------------------------
>
> Key: BEAM-9566
> URL: https://issues.apache.org/jira/browse/BEAM-9566
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, testing-nexmark
> Affects Versions: 2.20.0
> Reporter: Bingfeng Xia
> Assignee: Maximilian Michels
> Priority: Critical
> Fix For: 2.20.0
>
>
> Nexmark tests show that the throughput of FlinkRunner with Rocksdb state
> backend dropped by 50%~80% in Query 4/5/6/9/11. Some other queries also
> dropped but not as much as these queries. Affected queries contain Keyed
> State.
>
>
> Nexmark tests results. Tests have been run on the same machine and can be
> reproduced.
> - before regression:
> {code:java}
> Performance:
> Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
> (Baseline)
> 0000 3.8 26171.2 100000
> 0001 3.9 25967.3 92000
> 0002 1.9 53447.4 351
> 0003 2.8 35791.0 580
> 0004 2.5 4045.3 40
> 0005 9.6 10448.2 12
> 0006 1.2 8532.4 401
> 0007 4.0 25018.8 1
> 0008 2.9 34928.4 6000
> 0009 1.1 9066.2 298
> 0010 9.5 10564.1 2
> 0011 11.1 9005.0 1919
> 0012 4.5 22075.1 1919
> 0013 4.4 22547.9 92000
> 0014 9.7 10261.7 92000
> {code}
> - after regression:
> {code:java}
> Performance:
> Conf Runtime(sec) (Baseline) Events(/sec) (Baseline) Results
> (Baseline)
> 0000 4.5 22036.1 100000
> 0001 3.9 25839.8 92000
> 0002 2.3 43763.7 351
> 0003 3.5 28669.7 580
> 0004 3.6 2801.1 40
> 0005 22.6 4429.1 12
> 0006 2.5 3993.6 401
> 0007 7.5 13320.9 1
> 0008 2.7 36737.7 6000
> 0009 2.5 3930.8 298
> 0010 16.2 6178.6 2
> 0011 82.9 1206.3 1919
> 0012 5.9 16874.8 1919
> 0013 4.2 23889.2 92000
> {code}
>
> The regression comes from the "updateWatermarkHold()" function recently added
> in Flink DoFnOperator in
> [PR#10534|h[https://github.com/apache/beam/pull/10534]]
> [https://github.com/apache/beam/blob/bdd1726fd6b1103791f597f5e746ea2d205cf648/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1140]
> It's to allow FinkRunner to set watermark holds. However, the implementation
> was not performance efficient:
> # "pendingTimersById" is an interface of the RocksDB key/value map state.
> Iterate all values via "pendingTimersById.values()" is a big cost. According
> to the bellow CPU cycles profiling result of Query 4 by
> [async-profiler|[https://github.com/jvm-profiling-tools/async-profiler]], we
> also can see that most of the CPU time was spent on "RocksIterator.next()"
> brought by "pendingTimersById.values()".
> # Another overhead is that this function will be called multiple times in
> each setTimer and deleteTimer;
>
> CPU time profiling result (FlameGraph) of Query 4 in Nexmark:
> [https://drive.google.com/open?id=1muVQipv-JidxVceQkOze5PZozPgPB_bh]
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)