[
https://issues.apache.org/jira/browse/FLINK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624953#comment-17624953
]
Alexander Smirnov commented on FLINK-19822:
-------------------------------------------
Hi [~godfreyhe]! I was wondering why such important improvement is still open
and decided to test your realization of optimization from PR. I found that in
case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get
unwanted side effects related to moving Exchanges together with removing
redundant ones (as I understand, Volcano planner decides where to put Exchange
by his own). Because of this I got several negative side effects: 1. Filters
could be located after shuffle (previously filters were before shuffle); 2.
Broken Local-Global optimization in Aggregate; 3. Some operators work
incorrectly when they are chained with Calc (WindowOperator, as an example).
Maybe it was a reason why this improvement was unsolved (in your commit I found
a test checking that Calc doesn't push distribution through itself). Am I
right? Anyway I decided to find a different approach to make this optimization.
I came up with an idea to make specific rule
_StreamRemoveRedundantExchangeRule,_ that traverses RelNode graph from sources
to sinks (previously it was in reverse order) and remove duplicated Exchanges.
Information about input distribution is passed through new trait
{_}InputRelDistributionTrait{_}. To support optimization stream RelNodes need
to implement new method _satisfyTraitsFromImputs_ (similar to _satisfyTraits_
method). Can you look at my realization and provide some feedback, please? -
[https://github.com/apache/flink/pull/21170.] I reused some code from your PR,
hope you don't mind.
> Remove redundant shuffle for streaming
> --------------------------------------
>
> Key: FLINK-19822
> URL: https://issues.apache.org/jira/browse/FLINK-19822
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: godfrey he
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor,
> auto-unassigned, pull-request-available
>
> This is similar
> [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could
> implement {{satisfyTraits}} method for stream nodes to remove redundant
> shuffle. This could add more possibilities that more operators can be merged
> into multiple input operator.
> Different batch, stream operators require the shuffle keys and the state keys
> must be exactly the same, otherwise the state may be not correct.
> We only support a few operators in this issue, such as Join and regular
> Aggregate. Other operators will be supported in the future.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)