Junrui Li created FLINK-30821:
---------------------------------
Summary: The optimized exec plan generated by
OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of
all-blocking is not as expected
Key: FLINK-30821
URL: https://issues.apache.org/jira/browse/FLINK-30821
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.16.0, 1.17.0
Reporter: Junrui Li
Fix For: 1.17.0, 1.16.2
The optimized exec plan generated by
OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of
all-blocking is that
{code:java}
Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1,
w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER)
/ w3$o0) AS INTEGER) AS EXPR$4])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2,
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[hash[c]])
+- OverAggregate(partitionBy=[b], orderBy=[c ASC],
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[b ASC, c ASC])
+- Exchange(distribution=[hash[b]])
+- OverAggregate(orderBy=[c ASC, a ASC],
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],
select=[a, b, c, w0$o2, w1$o0, w0$o1])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[forward])
+- OverAggregate(orderBy=[b ASC],
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
+- Sort(orderBy=[b ASC])
+- Exchange(distribution=[single])
+-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
However, the expected plan is that
{code:java}
Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1,
w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER)
/ w3$o0) AS INTEGER) AS EXPR$4])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2,
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[hash[c]])
+- OverAggregate(partitionBy=[b], orderBy=[c ASC],
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[b ASC, c ASC])
+- Exchange(distribution=[hash[b]])
+- OverAggregate(orderBy=[c ASC, a ASC],
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW],
select=[a, b, c, w0$o2, w1$o0, w0$o1])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[c ASC, a ASC])
+- Exchange(distribution=[forward])
+- OverAggregate(orderBy=[b ASC],
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
+- Exchange (distribution=[forward])
+- Sort(orderBy=[b ASC])
+- Exchange(distribution=[single])
+-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)