[
https://issues.apache.org/jira/browse/FLINK-30533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dong Lin updated FLINK-30533:
-----------------------------
Description:
Currently, each invocation of SourceOperator#emitNext() push at most one record
to the given DataOutput. This unnecessarily increases the average Java call
stack depth needed to produce a record.
Take the following program as an example. For each element produced by this
program, Flink runtime needs to include in the call stack these 3 function
calls:
* StreamTask#processInput()
* StreamOneInputProcessor#processInput()
* StreamTaskSourceInput#emitNext()
{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
This ticket proposes to update SourceOperator#emitNext() to push records to
DataOutput in a while loop. It improves Flink performance by removing an
average of 3 function from the call stack needed to produce a record.
Here are the benchmark results obtained by running the above program with
parallelism=1 and object re-use enabled. The results are averaged across 5 runs
for each setup.
* Prior to the proposed change, the average execution time is 46.1 sec with
std=5.1 sec.
* After the proposed change, the average execution time is 33.3 sec with
std=0.9 sec.
* The proposed change increases throughput by 38.4%.
was:
Currently, each invocation of SourceOperator#emitNext() push at most one record
to the given DataOutput. This unnecessarily increases the average Java call
stack depth needed to produce a record.
Take the following program as an example. For each element produced by this
program, Flink runtime needs to include in the call stack these 3 function
calls:
* StreamTask#processInput()
* StreamOneInputProcessor#processInput()
* StreamTaskSourceInput#emitNext()
{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
This ticket proposes to update SourceOperator#emitNext() to push records to
DataOutput in a while loop. It improves Flink performance by removing an
average of 3 function from the call stack needed to produce a record.
Here are the benchmark results obtained by running the above program with
parallelism=1. The results are averaged across 5 runs for each setup.
* Prior to proposed change, the average execution time is 46.1 sec with
std=5.1 sec.
* After the proposed change, the average execution time is 33.3 sec with
std=0.9 sec.
* The proposed change increases throughput by 38.4%.
> SourceOperator#emitNext() should push records to DataOutput in a while loop
> ---------------------------------------------------------------------------
>
> Key: FLINK-30533
> URL: https://issues.apache.org/jira/browse/FLINK-30533
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
> Labels: pull-request-available
>
> Currently, each invocation of SourceOperator#emitNext() push at most one
> record to the given DataOutput. This unnecessarily increases the average Java
> call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this
> program, Flink runtime needs to include in the call stack these 3 function
> calls:
> * StreamTask#processInput()
> * StreamOneInputProcessor#processInput()
> * StreamTaskSourceInput#emitNext()
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 1000000000L)
> .map(x -> x)
> .addSink(new DiscardingSink<>());
> {code}
>
> This ticket proposes to update SourceOperator#emitNext() to push records to
> DataOutput in a while loop. It improves Flink performance by removing an
> average of 3 function from the call stack needed to produce a record.
> Here are the benchmark results obtained by running the above program with
> parallelism=1 and object re-use enabled. The results are averaged across 5
> runs for each setup.
> * Prior to the proposed change, the average execution time is 46.1 sec with
> std=5.1 sec.
> * After the proposed change, the average execution time is 33.3 sec with
> std=0.9 sec.
> * The proposed change increases throughput by 38.4%.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)