[
https://issues.apache.org/jira/browse/FLINK-34694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837205#comment-17837205
]
Roman Boyko edited comment on FLINK-34694 at 4/15/24 12:15 PM:
---------------------------------------------------------------
Hi [~xu_shuai_] !
I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and
q20). Because all of them use INNER JOIN (but this optimization works only for
outer join) I created the copy with FULL OUTER JOIN for every one.
BEFORE optimization:
!image-2024-04-15-15-45-51-027.png!
AFTER optimization:
!image-2024-04-15-15-46-17-671.png!
As you can see here - for all queries except q20_outer the result remains
almost the same (small difference most probably cause the measurement error).
But for q20_outer the performance is more than 2 times better (I repeated the
test several times). The reason of such huge difference can be found on flame
graph:
BEFORE optimization:
!image-2024-04-15-19-15-23-010.png!
AFTER optimization:
!image-2024-04-15-19-14-41-909.png!
Because of prevalence of state.update operation in before-optimization case the
rocksdb CompactionJob is invoked more often spending the most CPU time.
There is no such performance boost for q4, q7 and q9 because:
* q7 translates to Interval join
* q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a
bug, I will check later)
was (Author: rovboyko):
Hi [~xu_shuai_] !
I prepared and executed all nexmark which uses streaming join (q4, q7, q9 and
q20). Because all of them use INNER JOIN (but this optimization works only for
outer join) I created the copy with FULL OUTER JOIN for every one.
BEFORE optimization:
!image-2024-04-15-15-45-51-027.png!
AFTER optimization:
!image-2024-04-15-15-46-17-671.png!
As you can see here - for all queries except q20_outer the result remains
almost the same (small difference most probably cause the measurement error).
But for q20_outer the performance is more than 2 times better (I repeated the
test several times). The reason of such huge difference can be found on flame
graph:
BEFORE optimization:
!image-2024-04-15-15-53-44-308.png!
AFTER optimization:
!image-2024-04-15-15-55-27-313.png!
Because of prevalence of state.update operation in before-optimization case the
rocksdb CompactionJob is invoked more often spending the most CPU time.
There is no such performance boost for q4, q7 and q9 because:
* q7 translates to Interval join
* q4 and q9 transformed to InnerJoin by FlinkFilterJoinRule (maybe this is a
bug, I will check later)
> Delete num of associations for streaming outer join
> ---------------------------------------------------
>
> Key: FLINK-34694
> URL: https://issues.apache.org/jira/browse/FLINK-34694
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Runtime
> Reporter: Roman Boyko
> Priority: Major
> Attachments: image-2024-03-15-19-51-29-282.png,
> image-2024-03-15-19-52-24-391.png, image-2024-04-15-15-45-51-027.png,
> image-2024-04-15-15-46-17-671.png, image-2024-04-15-19-14-14-735.png,
> image-2024-04-15-19-14-41-909.png, image-2024-04-15-19-15-23-010.png
>
>
> Currently in StreamingJoinOperator (non-window) in case of OUTER JOIN the
> OuterJoinRecordStateView is used to store additional field - the number of
> associations for every record. This leads to store additional Tuple2 and
> Integer data for every record in outer state.
> This functionality is used only for sending:
> * -D[nullPaddingRecord] in case of first Accumulate record
> * +I[nullPaddingRecord] in case of last Revoke record
> The overhead of storing additional data and updating the counter for
> associations can be avoided by checking the input state for these events.
>
> The proposed solution can be found here -
> [https://github.com/rovboyko/flink/commit/1ca2f5bdfc2d44b99d180abb6a4dda123e49d423]
>
> According to the nexmark q20 test (changed to OUTER JOIN) it could increase
> the performance up to 20%:
> * Before:
> !image-2024-03-15-19-52-24-391.png!
> * After:
> !image-2024-03-15-19-51-29-282.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)