[ 
https://issues.apache.org/jira/browse/FLINK-30536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-30536:
-----------------------------
    Description: 
For the example program shown below, a CountingOutput will be added to the 
per-record code path for each map operation added in the program. This reduces 
the Flink performance as it increases the function call stack depth on the 
critical code path.
{code:java}
DataStream<Long> stream = env.fromSequence(1, 500000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
 

Instead of adding a CountingOutput that wraps around ChainingOutput, we can add 
a Counter in the ChainingOutput to achieve the same goal in most cases. We can 
do this when constructing the operator chain.

By making the change described above, we can reduce the call stack depth, 
increase the chance for function inline, and reduce the Flink runtime overhead.

 

Prior to the proposed change, each map() operation in the above program would 
add the following 3 functions on the call stack needed to produce a record:
 * CountingOutput#collect
 * ChainingOutput#collect
 * StreamMap#processElement

After the proposed change, the number of functions added for each map() 
operation would be reduced from 3 to 2, with ChainingOutput#collect removed 
from the call stack.

 

Here are the benchmark results obtained by running the above program with 
parallelism=1 and object re-use enabled. The results are averaged across 5 runs 
for each setup.
 * Prior to the proposed change, the average execution time is 44.15 sec with 
std=1.4 sec.
 * After the proposed change, the average execution time is 42.15 sec with 
std=0.76 sec.
 * The proposed change increases throughput by 4.7%.

 

 

  was:
For the example program shown below, a CountingOutput will be added to the 
per-record code path for each map operation added in the program. This reduces 
the Flink performance as it increases the function call stack depth on the 
critical code path.
{code:java}
DataStream<Long> stream = env.fromSequence(1, 500000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
 

Instead of adding a CountingOutput that wraps around e.g. ChainingOutput, we 
can instead add a Counter in the ChainingOutput to achieve the same goal in 
most cases. We can do this when constructing the operator chain.

By making the change described above, we can reduce the call stack depth, 
increase the chance for function inline, and reduce the Flink runtime overhead.

 

Prior to the proposed change, each map() operation in the above program would 
add the following 3 functions on the call stack needed to produce a record:
 * CountingOutput#collect
 * ChainingOutput#collect
 * StreamMap#processElement

After the proposed change, the number of functions added for each map() 
operation would be reduced from 3 to 2, with ChainingOutput#collect removed 
from the call stack.

 

Here are the benchmark results obtained by running the above program with 
parallelism=1 and object re-use enabled. The results are averaged across 5 runs 
for each setup.
 * Prior to the proposed change, the average execution time is 44.15 sec with 
std=1.4 sec.
 * After the proposed change, the average execution time is 42.15 sec with 
std=0.76 sec.
 * The proposed change increases throughput by 4.7%.

 

 


> Adding an operator should not have to add a CountingOutput on per-record code 
> path
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-30536
>                 URL: https://issues.apache.org/jira/browse/FLINK-30536
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: Dong Lin
>            Priority: Major
>              Labels: pull-request-available
>
> For the example program shown below, a CountingOutput will be added to the 
> per-record code path for each map operation added in the program. This 
> reduces the Flink performance as it increases the function call stack depth 
> on the critical code path.
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 500000000L)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .addSink(new DiscardingSink<>());
> {code}
>  
> Instead of adding a CountingOutput that wraps around ChainingOutput, we can 
> add a Counter in the ChainingOutput to achieve the same goal in most cases. 
> We can do this when constructing the operator chain.
> By making the change described above, we can reduce the call stack depth, 
> increase the chance for function inline, and reduce the Flink runtime 
> overhead.
>  
> Prior to the proposed change, each map() operation in the above program would 
> add the following 3 functions on the call stack needed to produce a record:
>  * CountingOutput#collect
>  * ChainingOutput#collect
>  * StreamMap#processElement
> After the proposed change, the number of functions added for each map() 
> operation would be reduced from 3 to 2, with ChainingOutput#collect removed 
> from the call stack.
>  
> Here are the benchmark results obtained by running the above program with 
> parallelism=1 and object re-use enabled. The results are averaged across 5 
> runs for each setup.
>  * Prior to the proposed change, the average execution time is 44.15 sec with 
> std=1.4 sec.
>  * After the proposed change, the average execution time is 42.15 sec with 
> std=0.76 sec.
>  * The proposed change increases throughput by 4.7%.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to