[ 
https://issues.apache.org/jira/browse/FLINK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624953#comment-17624953
 ] 

Alexander Smirnov edited comment on FLINK-19822 at 10/27/22 8:38 AM:
---------------------------------------------------------------------

Hi [~godfreyhe]! I was wondering why such important improvement is still open 
and decided to test your realization of optimization from PR. I found that in 
case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get 
unwanted side effects related to moving Exchanges together with removing 
redundant ones (as I understand, Volcano planner decides where to put Exchange 
by his own).
Because of this we get several negative side effects: 
1. Filters can be located after shuffle (previously filters were before 
shuffle); 
2. Broken Local-Global optimization in Aggregate; 
3. Some operators work incorrectly when they are chained with Calc 
(WindowOperator, as an example). 
Maybe it was a reason why this improvement was unsolved (in your commit I found 
a test checking that Calc doesn't push distribution through itself). Am I 
right? 
Anyway I decided to find a different approach to make this optimization. I came 
up with an idea to make specific rule _StreamRemoveRedundantExchangeRule,_ that 
traverses RelNode graph from sources to sinks (previously it was in reverse 
order) and remove duplicated Exchanges. Information about input distribution is 
passed through new trait {_}InputRelDistributionTrait{_}. To support 
optimization stream RelNodes need to implement new method 
_satisfyTraitsFromImputs_ (similar to _satisfyTraits_ method). 
Can you look at my realization and provide some feedback, please? - 
[https://github.com/apache/flink/pull/21170.] I reused some code from your PR, 
hope you don't mind.


was (Author: JIRAUSER288574):
Hi [~godfreyhe]! I was wondering why such important improvement is still open 
and decided to test your realization of optimization from PR. I found that in 
case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get 
unwanted side effects related to moving Exchanges together with removing 
redundant ones (as I understand, Volcano planner decides where to put Exchange 
by his own). Because of this we get several negative side effects: 1. Filters 
can be located after shuffle (previously filters were before shuffle); 2. 
Broken Local-Global optimization in Aggregate; 3. Some operators work 
incorrectly when they are chained with Calc (WindowOperator, as an example). 
Maybe it was a reason why this improvement was unsolved (in your commit I found 
a test checking that Calc doesn't push distribution through itself). Am I 
right? Anyway I decided to find a different approach to make this optimization. 
I came up with an idea to make specific rule 
_StreamRemoveRedundantExchangeRule,_ that traverses RelNode graph from sources 
to sinks (previously it was in reverse order) and remove duplicated Exchanges. 
Information about input distribution is passed through new trait 
{_}InputRelDistributionTrait{_}. To support optimization stream RelNodes need 
to implement new method _satisfyTraitsFromImputs_ (similar to _satisfyTraits_ 
method). Can you look at my realization and provide some feedback, please? - 
[https://github.com/apache/flink/pull/21170.] I reused some code from your PR, 
hope you don't mind.

> Remove redundant shuffle for streaming
> --------------------------------------
>
>                 Key: FLINK-19822
>                 URL: https://issues.apache.org/jira/browse/FLINK-19822
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> auto-unassigned, pull-request-available
>
> This is similar 
> [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could 
> implement {{satisfyTraits}} method for stream nodes to remove redundant 
> shuffle. This could add more possibilities that more operators can be merged 
> into multiple input operator.
> Different batch, stream operators require the shuffle keys and the state keys 
> must be exactly the same, otherwise the state may be not correct.
> We only support a few operators in this issue, such as Join and regular 
> Aggregate. Other operators will be supported in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to