[
https://issues.apache.org/jira/browse/FLINK-30709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17681268#comment-17681268
]
Dong Lin commented on FLINK-30709:
----------------------------------
[~pnowojski] Sure. Thanks for the reminder. I just realized that we have this
guideline for Merging Pull Requests [1] and it is documented that we should add
a comment in the JIRA listing the commit hash for each branch respectively. I
will follow this guideline in the future.
BTW, I also notice that the wiki says we should add "This closes #PR_ID" to the
message of the last commit of the PR. This is the practice I have been using
when merging PR in various projects (including this commit) which also allows
us to find the commit for the JIRA.
However, it seems that many commits in Flink did not add this message in the
commit log. Maybe we should also remind other committers to follow this
guideline?
[1] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests[1]
> NetworkInput#emitNext() should push records to DataOutput in a while loop
> -------------------------------------------------------------------------
>
> Key: FLINK-30709
> URL: https://issues.apache.org/jira/browse/FLINK-30709
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0
>
>
> It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA
> focus on Network input.
>
> Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push
> at most one record to the given DataOutput. This unnecessarily increases the
> average Java call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this
> program, Flink runtime needs to include in the call stack these 3 function
> calls:
> * StreamTask#processInput()
> * StreamOneInputProcessor#processInput()
> * AbstractStreamTaskNetworkInput#emitNext()
> This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to
> push records to DataOutput in a while loop. It improves Flink performance by
> removing an average of 3 function from the call stack needed to produce a
> record.
> Here are the benchmark results obtained by running the
> [InputBenchmark#mapSink|https://github.com/apache/flink-benchmarks/blob/0bafe0e85700c889894324aadb70302381f98e03/src/main/java/org/apache/flink/benchmark/InputBenchmark.java#L55]
> with env.disableOperatorChaining(). And I run it 4 times on My Mac.
>
> {code:java}
> Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed
> results:
> Benchmark (sourceType) Mode Cnt Score Error Units
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12339.771 ± 414.649 ops/ms
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12687.872 ± 320.084 ops/ms
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12256.445 ± 512.219 ops/ms
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 12432.154 ± 405.083 ops/ms
> After the proposed change, the avg is 13836.845 ops/ms, here is detailed
> results:
> Benchmark (sourceType) Mode Cnt Score Error Units
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13092.451 ± 490.886 ops/ms
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13881.138 ± 370.249 ops/ms
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 13960.280 ± 389.505 ops/ms
> InputBenchmark.mapSink F27_UNBOUNDED thrpt 30 14413.511 ± 727.844
> ops/ms{code}
>
> The proposed change increases throughput by 11.3%.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)