[
https://issues.apache.org/jira/browse/FLINK-30531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683323#comment-17683323
]
Dong Lin commented on FLINK-30531:
----------------------------------
[~xtsong] Yes. I just closes this Jira since there is no more optimization I
plan to make as part of this JIRA. And the comparison of performance overhead
between Flink and Spark seems to be reasonable based on the benchmark results
described below.
I used the benchmark specified in the JIRA description.
* Flink 1.17-snapshot (including all the optimizations listed in this ticket)
takes 64.2 sec
* Flink 1.15.1 takes 77.3 sec
* Spark 3.2.3 takes 40.0 sec
Now Spark's throughput is still 60% faster than Flink. This is probably
reasonable given that Flink has additional control flow overhead per iteration
(e.g. check the mailbox) and additional overhead for each operator on the data
path (e.g. increment input/output counter metric).
> Reduce operator chain call stack depth
> --------------------------------------
>
> Key: FLINK-30531
> URL: https://issues.apache.org/jira/browse/FLINK-30531
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Task
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
> Fix For: 1.17.0
>
>
> Benchmark results show that Flink time to execute simple programs is more
> than 3X slower than Spark. For example, if we run the following program with
> object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a
> macbook, whereas it takes Spark less than 40 sec to run the same logic on the
> same machine.
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 1000000000L)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x).addSink(new DiscardingSink<>());
> {code}
>
> It turns out that the operator chain overhead introduced by Flink is
> surprisingly high. For the above example program, Flink runtime goes through
> a call stack of 24 functions to produce 1 element. And each extra map(...)
> operation introduces 3 extra functions in the call stack.
> Here are the 24 functions in the call stack:
> {code:bash}
> StreamTask#processInput
> StreamOneInputProcessor#processInput
> StreamTaskSourceInput#emitNext
> SourceOperator#emitNext
> IteratorSourceReaderBase#pollNext
> SourceOutputWithWatermarks#collect
> AsyncDataOutputToOutput#emitRecord
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamMap#processElement
> CountingOutput#collect
> ChainingOutput#collect
> StreamSink#processElement
> {code}
>
> Given the observation described above, here are the explanations for why
> Flink is slow for programs with low computation overhead:
> * For each record produced, Flink runtime currently incurs an unnecessarily
> deep function call stack. It can be more than 24 for a simple program
> consisting of 5 map() operations.
> * Java's maximum inline level is less than 18 [2]. It is easy for operator
> chain call stack to exceed this limit and prevent Java from inlining function
> calls, which further increases the function call overhead.
> * For function calls that are not inlined, it requires looking up a virtual
> table since most functions are virtual functions.
> Given the above explanations of the performance issue, here are the ideas to
> reduce Flink's runtime overhead:
> * Update SourceOperator#emitNext() to push records to DataOutput in a while
> loop. This can reduce the depth of the call stack needed to produce a record
> by 3 functions. See FLINK-30533 for more information.
> * Fuse some functions (e.g. ChainingOutput, StreamMap, CountingOutput) to
> reduce the call stack depth required for each extra operation (e.g. map(...)).
> [1] [https://arxiv.org/pdf/1610.09166.pdf]
> [2] [https://bugs.openjdk.org/browse/JDK-8234863]
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)