[ 
https://issues.apache.org/jira/browse/FLINK-30536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-30536:
-----------------------------
    Description: 
For the example program shown below, a CountingOutput will be added to the 
per-record code path for each map operation added in the program. This reduces 
the Flink performance as it increases the function call stack depth on the 
critical code path.
{code:java}
DataStream<Long> stream = env.fromSequence(1, 500000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}
 

Instead of adding a CountingOutput that wraps around e.g. ChainingOutput, we 
can instead add a Counter in the ChainingOutput to achieve the same goal in 
most cases. We can do this when constructing the operator chain.

By making the change described above, we can reduce the call stack depth, 
increase the chance for function inline, and reduce the Flink runtime overhead.

 

Here are the benchmark results obtained by running the above program with 
parallelism=1 and object re-use enabled. The results are averaged across 5 runs 
for each setup.
 * Prior to the proposed change, the average execution time is 44.15 sec with 
std=1.4 sec.
 * After the proposed change, the average execution time is 42.15 sec with 
std=0.76 sec.
 * The proposed change increases throughput by 4.7%.

 

  was:
For the example program shown below, a CountingOutput will be added to the 
per-record code path for each map operation added in the program. This reduces 
the program performance as it unnecessarily increases the function call stack 
depth for the critical code path.

 

{code:java}
DataStream<Long> stream = env.fromSequence(1, 1000000000L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.addSink(new DiscardingSink<>());
{code}


> Remove CountingOutput from per-record code path for most operators
> ------------------------------------------------------------------
>
>                 Key: FLINK-30536
>                 URL: https://issues.apache.org/jira/browse/FLINK-30536
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Dong Lin
>            Priority: Major
>              Labels: pull-request-available
>
> For the example program shown below, a CountingOutput will be added to the 
> per-record code path for each map operation added in the program. This 
> reduces the Flink performance as it increases the function call stack depth 
> on the critical code path.
> {code:java}
> DataStream<Long> stream = env.fromSequence(1, 500000000L)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .map(x -> x)
> .addSink(new DiscardingSink<>());
> {code}
>  
> Instead of adding a CountingOutput that wraps around e.g. ChainingOutput, we 
> can instead add a Counter in the ChainingOutput to achieve the same goal in 
> most cases. We can do this when constructing the operator chain.
> By making the change described above, we can reduce the call stack depth, 
> increase the chance for function inline, and reduce the Flink runtime 
> overhead.
>  
> Here are the benchmark results obtained by running the above program with 
> parallelism=1 and object re-use enabled. The results are averaged across 5 
> runs for each setup.
>  * Prior to the proposed change, the average execution time is 44.15 sec with 
> std=1.4 sec.
>  * After the proposed change, the average execution time is 42.15 sec with 
> std=0.76 sec.
>  * The proposed change increases throughput by 4.7%.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to