zhuzhurk commented on code in PR #21672:
URL: https://github.com/apache/flink/pull/21672#discussion_r1089772628
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SingleRowJoinTest.xml:
##########
@@ -237,20 +237,18 @@ LogicalProject(EXPR$0=[*($0, 0.1:DECIMAL(2, 1))])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a2, EXPR$1])
-+- NestedLoopJoin(joinType=[InnerJoin], where=[(EXPR$1 > $f0)], select=[a2,
EXPR$1, $f0], build=[right], singleRowJoin=[true])
- :- Exchange(distribution=[any])
- : +- HashAggregate(isMerge=[true], groupBy=[a2], select=[a2,
Final_SUM(sum$0) AS EXPR$1])
- : +- Exchange(distribution=[hash[a2]])
- : +- LocalHashAggregate(groupBy=[a2], select=[a2, Partial_SUM(a1) AS
sum$0])
- : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1,
a2])(reuse_id=[1])
- +- Exchange(distribution=[broadcast])
- +- HashAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
- +- Calc(select=[($f0 * 0.1) AS EXPR$0])
- +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
- +- Exchange(distribution=[single])
- +- LocalHashAggregate(select=[Partial_SUM(a1) AS sum$0])
- +- Calc(select=[a1])
- +- Reused(reference_id=[1])
++- MultipleInput(readOrder=[0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[(EXPR$1 > $f0)],
select=[a2, EXPR$1, $f0], build=[right], singleRowJoin=[true])\n:-
HashAggregate(isMerge=[true], groupBy=[a2], select=[a2, Final_SUM(sum$0) AS
EXPR$1])\n: +- [#2] Exchange(distribution=[hash[a2]])\n+- [#1]
Exchange(distribution=[broadcast])\n])
+ :- Exchange(distribution=[broadcast])
+ : +- HashAggregate(isMerge=[false], select=[SINGLE_VALUE(EXPR$0) AS $f0])
+ : +- Calc(select=[($f0 * 0.1) AS EXPR$0])
+ : +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS $f0])
+ : +- Exchange(distribution=[single])
+ : +- LocalHashAggregate(select=[Partial_SUM(a1) AS sum$0])
+ : +- Calc(select=[a1])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, A, source: [TestTableSource(a1, a2)]]], fields=[a1,
a2])(reuse_id=[1])
+ +- Exchange(distribution=[hash[a2]])
+ +- LocalHashAggregate(groupBy=[a2], select=[a2, Partial_SUM(a1) AS
sum$0])
+ +- Reused(reference_id=[1])
Review Comment:
Seems this plan becomes quite different. And I'm not sure yet whether it is
expected.
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml:
##########
@@ -1976,29 +1978,26 @@ Calc(select=[b])
+- SortMergeJoin(joinType=[LeftAntiJoin], where=[((d IS NULL OR ($f3 = d)) AND
(c = f))], select=[b, c, $f3])
:- Exchange(distribution=[hash[c]])
: +- Calc(select=[b, c, CASE(((c0 = 0) OR (i0 IS NULL AND (ck >= c0) AND a
IS NOT NULL)), 1, ((c1 = 0) OR (i IS NULL AND (ck0 >= c1) AND a IS NOT NULL)),
2, 3) AS $f3])
- : +- MultipleInput(readOrder=[1,0,0],
members=[\nSortMergeJoin(joinType=[LeftOuterJoin], where=[(a = EXPR$0)],
select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i])\n:-
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0,
c1, ck0], build=[right], singleRowJoin=[true])\n: :- [#1]
Exchange(distribution=[any])\n: +- [#2] Exchange(distribution=[broadcast])\n+-
Calc(select=[EXPR$0, true AS i])\n +- HashAggregate(isMerge=[true],
groupBy=[EXPR$0], select=[EXPR$0])\n +- [#3]
Exchange(distribution=[hash[EXPR$0]])\n])
- : :- Exchange(distribution=[any])
- : : +- Calc(select=[a, b, c, c0, ck, i0])
- : : +- MultipleInput(readOrder=[1,0,0],
members=[\nSortMergeJoin(joinType=[LeftOuterJoin], where=[(a = i)], select=[a,
b, c, c0, ck, i, i0])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true],
select=[a, b, c, c0, ck], build=[right], singleRowJoin=[true])\n: :- [#1]
Exchange(distribution=[hash[a]])\n: +- [#2]
Exchange(distribution=[broadcast])\n+- Calc(select=[i, true AS i0])\n +-
HashAggregate(isMerge=[true], groupBy=[i], select=[i])\n +- [#3]
Exchange(distribution=[hash[i]])\n])
- : : :- Exchange(distribution=[hash[a]])
- : : : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
- : : :- Exchange(distribution=[broadcast])
- : : : +- HashAggregate(isMerge=[true],
select=[Final_COUNT(count1$0) AS c, Final_COUNT(count$1) AS ck])
- : : : +- Exchange(distribution=[single])
- : : : +- LocalHashAggregate(select=[Partial_COUNT(*)
AS count1$0, Partial_COUNT(i) AS count$1])
- : : : +- Calc(select=[i])(reuse_id=[1])
- : : : +-
LegacyTableSourceScan(table=[[default_catalog, default_database, t, source:
[TestTableSource(i, j, k)]]], fields=[i, j, k])(reuse_id=[2])
- : : +- Exchange(distribution=[hash[i]])
- : : +- LocalHashAggregate(groupBy=[i], select=[i])
- : : +- Reused(reference_id=[1])
+ : +- MultipleInput(readOrder=[0,0,1,0,1],
members=[\nSortMergeJoin(joinType=[LeftOuterJoin], where=[(a = EXPR$0)],
select=[a, b, c, c0, ck, i0, c1, ck0, EXPR$0, i])\n:-
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck, i0,
c1, ck0], build=[right], singleRowJoin=[true])\n: :- Calc(select=[a, b, c, c0,
ck, i0])\n: : +- SortMergeJoin(joinType=[LeftOuterJoin], where=[(a = i)],
select=[a, b, c, c0, ck, i, i0])\n: : :-
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a, b, c, c0, ck],
build=[right], singleRowJoin=[true])\n: : : :- [#3]
Exchange(distribution=[hash[a]])\n: : : +- [#4]
Exchange(distribution=[broadcast])\n: : +- Calc(select=[i, true AS i0])\n:
: +- HashAggregate(isMerge=[true], groupBy=[i], select=[i])\n: :
+- [#5] Exchange(distribution=[hash[i]])\n: +- [#1]
Exchange(distribution=[broadcast])\n+- Calc(select=[EXPR$0, true AS i])\n +-
HashAggregate(isMerge=[true], groupBy=[EXPR$0], select
=[EXPR$0])\n +- [#2] Exchange(distribution=[hash[EXPR$0]])\n])
: :- Exchange(distribution=[broadcast])
: : +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0)
AS c, Final_COUNT(count$1) AS ck])
: : +- Exchange(distribution=[single])
: : +- LocalHashAggregate(select=[Partial_COUNT(*) AS
count1$0, Partial_COUNT(EXPR$0) AS count$1])
- : : +- Calc(select=[CAST(j AS INTEGER) AS
EXPR$0])(reuse_id=[3])
+ : : +- Calc(select=[CAST(j AS INTEGER) AS
EXPR$0])(reuse_id=[1])
+ : : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, t, source: [TestTableSource(i, j, k)]]], fields=[i, j,
k])(reuse_id=[2])
+ : :- Exchange(distribution=[hash[EXPR$0]])
+ : : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+ : : +- Reused(reference_id=[1])
+ : :- Exchange(distribution=[hash[a]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, l, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+ : :- Exchange(distribution=[broadcast])
+ : : +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0)
AS c, Final_COUNT(count$1) AS ck])
+ : : +- Exchange(distribution=[single])
+ : : +- LocalHashAggregate(select=[Partial_COUNT(*) AS
count1$0, Partial_COUNT(i) AS count$1])
+ : : +- Calc(select=[i])(reuse_id=[3])
: : +- Reused(reference_id=[2])
- : +- Exchange(distribution=[hash[EXPR$0]])
- : +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+ : +- Exchange(distribution=[hash[i]])
+ : +- LocalHashAggregate(groupBy=[i], select=[i])
Review Comment:
This plan seems to be quite different.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]