[ 
https://issues.apache.org/jira/browse/FLINK-30709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17681268#comment-17681268
 ] 

Dong Lin commented on FLINK-30709:
----------------------------------

[~pnowojski] Sure. Thanks for the reminder. I just realized that we have this 
guideline for Merging Pull Requests [1] and it is documented that we should add 
a comment in the JIRA listing the commit hash for each branch respectively. I 
will follow this guideline in the future.

BTW, I also notice that the wiki says we should add "This closes #PR_ID" to the 
message of the last commit of the PR. This is the practice I have been using 
when merging PR in various projects (including this commit) which also allows 
us to find the commit for the JIRA.

However, it seems that many commits in Flink did not add this message in the 
commit log. Maybe we should also remind other committers to follow this 
guideline?

 

[1] https://cwiki.apache.org/confluence/display/FLINK/Merging+Pull+Requests[1]

> NetworkInput#emitNext() should push records to DataOutput in a while loop
> -------------------------------------------------------------------------
>
>                 Key: FLINK-30709
>                 URL: https://issues.apache.org/jira/browse/FLINK-30709
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> It's similar to FLINK-30533, FLINK-30533 focus on source operator, this JIRA 
> focus on Network input.
>  
> Currently, each invocation of AbstractStreamTaskNetworkInput#emitNext() push 
> at most one record to the given DataOutput. This unnecessarily increases the 
> average Java call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this 
> program, Flink runtime needs to include in the call stack these 3 function 
> calls:
>  * StreamTask#processInput()
>  * StreamOneInputProcessor#processInput()
>  * AbstractStreamTaskNetworkInput#emitNext()
> This ticket proposes to update AbstractStreamTaskNetworkInput#emitNext() to 
> push records to DataOutput in a while loop. It improves Flink performance by 
> removing an average of 3 function from the call stack needed to produce a 
> record.
> Here are the benchmark results obtained by running the 
> [InputBenchmark#mapSink|https://github.com/apache/flink-benchmarks/blob/0bafe0e85700c889894324aadb70302381f98e03/src/main/java/org/apache/flink/benchmark/InputBenchmark.java#L55]
>  with env.disableOperatorChaining(). And I run it 4 times on My Mac.
>  
> {code:java}
> Before the proposed change, the avg is 12429.0605 ops/ms, here is detailed 
> results:
> Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12339.771 ± 414.649  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12687.872 ± 320.084  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12256.445 ± 512.219  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  12432.154 ± 405.083  ops/ms
> After the proposed change, the avg is 13836.845 ops/ms, here is detailed 
> results:
> Benchmark                (sourceType)   Mode  Cnt      Score     Error   Units
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13092.451 ± 490.886  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13881.138 ± 370.249  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  13960.280 ± 389.505  ops/ms
> InputBenchmark.mapSink  F27_UNBOUNDED  thrpt   30  14413.511 ± 727.844  
> ops/ms{code}
>  
> The proposed change increases throughput by 11.3%.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to