[
https://issues.apache.org/jira/browse/FLINK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17624953#comment-17624953
]
Alexander Smirnov edited comment on FLINK-19822 at 10/27/22 8:38 AM:
---------------------------------------------------------------------
Hi [~godfreyhe]! I was wondering why such important improvement is still open
and decided to test your realization of optimization from PR. I found that in
case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get
unwanted side effects related to moving Exchanges together with removing
redundant ones (as I understand, Volcano planner decides where to put Exchange
by his own).
Because of this we get several negative side effects:
1. Filters can be located after shuffle (previously filters were before
shuffle);
2. Broken Local-Global optimization in Aggregate;
3. Some operators work incorrectly when they are chained with Calc
(WindowOperator, as an example).
Maybe it was a reason why this improvement was unsolved (in your commit I found
a test checking that Calc doesn't push distribution through itself). Am I
right?
Anyway I decided to find a different approach to make this optimization. I came
up with an idea to make specific rule _StreamRemoveRedundantExchangeRule,_ that
traverses RelNode graph from sources to sinks (previously it was in reverse
order) and remove duplicated Exchanges. Information about input distribution is
passed through new trait {_}InputRelDistributionTrait{_}. To support
optimization stream RelNodes need to implement new method
_satisfyTraitsFromImputs_ (similar to _satisfyTraits_ method).
Can you look at my realization and provide some feedback, please? -
[https://github.com/apache/flink/pull/21170.] I reused some code from your PR,
hope you don't mind.
was (Author: JIRAUSER288574):
Hi [~godfreyhe]! I was wondering why such important improvement is still open
and decided to test your realization of optimization from PR. I found that in
case when we also support method 'satisfyTraits' by StreamPhysicalCalc, we get
unwanted side effects related to moving Exchanges together with removing
redundant ones (as I understand, Volcano planner decides where to put Exchange
by his own). Because of this we get several negative side effects: 1. Filters
can be located after shuffle (previously filters were before shuffle); 2.
Broken Local-Global optimization in Aggregate; 3. Some operators work
incorrectly when they are chained with Calc (WindowOperator, as an example).
Maybe it was a reason why this improvement was unsolved (in your commit I found
a test checking that Calc doesn't push distribution through itself). Am I
right? Anyway I decided to find a different approach to make this optimization.
I came up with an idea to make specific rule
_StreamRemoveRedundantExchangeRule,_ that traverses RelNode graph from sources
to sinks (previously it was in reverse order) and remove duplicated Exchanges.
Information about input distribution is passed through new trait
{_}InputRelDistributionTrait{_}. To support optimization stream RelNodes need
to implement new method _satisfyTraitsFromImputs_ (similar to _satisfyTraits_
method). Can you look at my realization and provide some feedback, please? -
[https://github.com/apache/flink/pull/21170.] I reused some code from your PR,
hope you don't mind.
> Remove redundant shuffle for streaming
> --------------------------------------
>
> Key: FLINK-19822
> URL: https://issues.apache.org/jira/browse/FLINK-19822
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: godfrey he
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor,
> auto-unassigned, pull-request-available
>
> This is similar
> [FLINK-12575|https://issues.apache.org/jira/browse/FLINK-12575], we could
> implement {{satisfyTraits}} method for stream nodes to remove redundant
> shuffle. This could add more possibilities that more operators can be merged
> into multiple input operator.
> Different batch, stream operators require the shuffle keys and the state keys
> must be exactly the same, otherwise the state may be not correct.
> We only support a few operators in this issue, such as Join and regular
> Aggregate. Other operators will be supported in the future.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)