[ 
https://issues.apache.org/jira/browse/FLINK-30533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-30533:
-----------------------------
    Description: 
Currently, each invocation of SourceOperator#emitNext() push at most one record 
to the given DataOutput. This unnecessarily increases the average Java call 
stack depth needed to produce a record.

Take the following program as an example. For each element produced by this 
program, Flink runtime needs to include in the call stack these 3 function 
calls:
 * StreamTask#processInput()
 * StreamOneInputProcessor#processInput()
 * StreamTaskSourceInput#emitNext()

{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
 

This ticket proposes to update SourceOperator#emitNext() to push records to 
DataOutput in a while loop. It improves Flink performance by removing an 
average of 3 function from the call stack needed to produce a record.

Here are the benchmark results obtained by running the above program with 
parallelism=1. The results are averaged across 5 runs for each setup.
 * Prior to the proposed change, the average execution time is 33.3 sec with 
std=0.9 sec.
 * After the proposed change, the average execution time is 46.1 sec with 
std=5.1 sec.
 * The proposed change increases throughput by 38.4%.

 

  was:
Currently, each invocation of IteratorSourceReaderBase#pollNext() push at most 
one record to the given ReaderOutput. This unnecessarily increases the average 
Java call stack depth needed to produce an element.

 

Take the following program as an example. For each element produced by this 
program, Flink runtime needs to include in the call stack these 4 function 
calls:
 * StreamTask#processInput()
 * StreamOneInputProcessor#processInput()
 * StreamTaskSourceInput#emitNext()
 * SourceOperator#emitNext()

{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
 

In comparison, SourceReaderBase#pollNext() is already using a while loop so 
that each invocation of this method could push as many records to the given 
ReaderOutput as possible.

 

This ticket proposes to update IteratorSourceReaderBase#pollNext() to push 
records to ReaderOutput in a while loop. It improves performance for programs 
that IteratorSourceReaderBase (e.g. env.fromSequence) by removing an average of 
3 function from the call stack needed to produce a record.

 

Here are the benchmark results by running the above program with parallelism=1 
and 5 runs per setup.
 * Prior to the proposed change, the average execution time is 33.3 sec with 
std=0.9 sec.
 * After the proposed change, the average execution time is 46.1 sec with 
std=5.1 sec.
 * The proposed change increases throughput by 38.4%.

 


> SourceOperator#emitNext() should push records to DataOutput in a while loop
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-30533
>                 URL: https://issues.apache.org/jira/browse/FLINK-30533
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, each invocation of SourceOperator#emitNext() push at most one 
> record to the given DataOutput. This unnecessarily increases the average Java 
> call stack depth needed to produce a record.
> Take the following program as an example. For each element produced by this 
> program, Flink runtime needs to include in the call stack these 3 function 
> calls:
>  * StreamTask#processInput()
>  * StreamOneInputProcessor#processInput()
>  * StreamTaskSourceInput#emitNext()
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 1000000000L)
> .map(x -> x)
> .addSink(new DiscardingSink<>());
> {code}
>  
> This ticket proposes to update SourceOperator#emitNext() to push records to 
> DataOutput in a while loop. It improves Flink performance by removing an 
> average of 3 function from the call stack needed to produce a record.
> Here are the benchmark results obtained by running the above program with 
> parallelism=1. The results are averaged across 5 runs for each setup.
>  * Prior to the proposed change, the average execution time is 33.3 sec with 
> std=0.9 sec.
>  * After the proposed change, the average execution time is 46.1 sec with 
> std=5.1 sec.
>  * The proposed change increases throughput by 38.4%.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to