[ 
https://issues.apache.org/jira/browse/FLINK-30601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang updated FLINK-30601:
-------------------------------
    Description: 
Currently, flink will set the correct key context(by call 
[setKeyContextElement|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java#:~:text=input.setKeyContextElement(castRecord)%3B])
 before processing each record, which is typically used to extract key from 
record and pass that key to the state backends.

However, the "setKeyContextElement" is obviously not need for non-keyed 
stream/operator, in which case we can omit the "setKeyContextElement" calls to 
improve performance. Note that "setKeyContextElement" is an interface method, 
it requires looking up the interface table when calling, which will further 
increase the method call overhead.
 
We run the following program as benchmark with parallelism=1 and object re-use 
enabled. The benchmark results are averaged across 5 runs for each setup. 
Before and after applying the proposed change, the average execution time 
changed from 88.39 s to 78.76 s, which increases throughput by 10.8%.
{code:java}
env.fromSequence(1, 1000000000L)
        .map(x -> x)
        .map(x -> x)
        .map(x -> x)
        .map(x -> x)
        .map(x -> x).addSink(new DiscardingSink<>());{code}

  was:
Currently, flink will set the correct key context(by call 
[setKeyContextElement|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java#:~:text=input.setKeyContextElement(castRecord)%3B])
 before processing each record, which is typically used to extract key from 
record and pass that key to the state backends.

However, the "setKeyContextElement" is obviously not need for non-keyed 
stream/operator, in which case we can omit the "setKeyContextElement" calls to 
improve performance. Note that "setKeyContextElement" is an interface method, 
it requires looking up the interface table when calling, which will further 
increase the method call overhead.
 
We run the following program as benchmark with parallelism=1 and object re-use 
enabled. The benchmark results are averaged across 5 runs for each setup. 
Before and after applying the proposed change, the average execution time 
changed from 88.39 s to 78.76 s, which increases throughput by 10.8%.
{code:java}
env.fromSequence(1, 1000000000L)
        .map(x -> x)
        .map(x -> x)
        .map(x -> x)
        .map(x -> x)
        .map(x -> x).addSink(new DiscardingSink<>());
{code}
 
 


> Omit "setKeyContextElement" call for non-keyed stream/operators to improve 
> performance
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-30601
>                 URL: https://issues.apache.org/jira/browse/FLINK-30601
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Task
>            Reporter: Lijie Wang
>            Priority: Major
>             Fix For: 1.17.0
>
>
> Currently, flink will set the correct key context(by call 
> [setKeyContextElement|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java#:~:text=input.setKeyContextElement(castRecord)%3B])
>  before processing each record, which is typically used to extract key from 
> record and pass that key to the state backends.
> However, the "setKeyContextElement" is obviously not need for non-keyed 
> stream/operator, in which case we can omit the "setKeyContextElement" calls 
> to improve performance. Note that "setKeyContextElement" is an interface 
> method, it requires looking up the interface table when calling, which will 
> further increase the method call overhead.
>  
> We run the following program as benchmark with parallelism=1 and object 
> re-use enabled. The benchmark results are averaged across 5 runs for each 
> setup. Before and after applying the proposed change, the average execution 
> time changed from 88.39 s to 78.76 s, which increases throughput by 10.8%.
> {code:java}
> env.fromSequence(1, 1000000000L)
>         .map(x -> x)
>         .map(x -> x)
>         .map(x -> x)
>         .map(x -> x)
>         .map(x -> x).addSink(new DiscardingSink<>());{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to