[ 
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837205#comment-17837205
 ] 

Roman Boyko edited comment on FLINK-34694 at 4/15/24 12:15 PM:
---------------------------------------------------------------

Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and 
q20). Because all of them use INNER JOIN (but this optimization works only for 
outer join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-15-15-45-51-027.png!

AFTER optimization:

!image-2024-04-15-15-46-17-671.png!

As you can see here - for all queries except q20_outer the result remains 
almost the same (small difference most probably cause the measurement error). 
But for q20_outer the performance is more than 2 times better (I repeated the 
test several times). The reason of such huge difference can be found on flame 
graph:

BEFORE optimization:

!image-2024-04-15-19-15-23-010.png!

 

AFTER optimization:

!image-2024-04-15-19-14-41-909.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

There is no such performance boost for q4, q7 and q9 because:
 * q7 translates to Interval join
 * q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a 
bug, I will check later)


was (Author: rovboyko):
Hi [~xu_shuai_] !

I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and 
q20). Because all of them use INNER JOIN (but this optimization works only for 
outer join) I created the copy with FULL OUTER JOIN for every one.

BEFORE optimization:

!image-2024-04-15-15-45-51-027.png!

AFTER optimization:

!image-2024-04-15-15-46-17-671.png!

As you can see here - for all queries except q20_outer the result remains 
almost the same (small difference most probably cause the measurement error). 
But for q20_outer the performance is more than 2 times better (I repeated the 
test several times). The reason of such huge difference can be found on flame 
graph:

BEFORE optimization:

!image-2024-04-15-15-53-44-308.png!

 

AFTER optimization:

!image-2024-04-15-15-55-27-313.png!

 

Because of prevalence of state.update operation in before-optimization case the 
rocksdb CompactionJob is invoked more often spending the most CPU time.

There is no such performance boost for q4, q7 and q9 because:
 * q7 translates to Interval join
 * q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a 
bug, I will check later)

> Delete num of associations for streaming outer join
> ---------------------------------------------------
>
>                 Key: FLINK-34694
>                 URL: https://issues.apache.org/jira/browse/FLINK-34694
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>            Reporter: Roman Boyko
>            Priority: Major
>         Attachments: image-2024-03-15-19-51-29-282.png, 
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png, 
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png, 
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the 
> OuterJoinRecordStateView is used to store additional field - the number of 
> associations for every record. This leads to store additional Tuple2 and 
> Integer data for every record in outer state.
> This functionality is used only for sending:
>  * -D[nullPaddingRecord] in case of first Accumulate record
>  * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for 
> associations can be avoided by checking the input state for these events.
>  
> The proposed solution can be found here - 
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>  
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase 
> the performance up to 20%:
>  * Before:
> !image-2024-03-15-19-52-24-391.png!
>  * After:
> !image-2024-03-15-19-51-29-282.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to