Junrui Li created FLINK-30821:
---------------------------------

             Summary: The optimized exec plan generated by 
OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of 
all-blocking is not as expected
                 Key: FLINK-30821
                 URL: https://issues.apache.org/jira/browse/FLINK-30821
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.16.0, 1.17.0
            Reporter: Junrui Li
             Fix For: 1.17.0, 1.16.2


The optimized exec plan generated by 
OverAggregateTest#testDiffPartitionKeysWithDiffOrderKeys2 in the case of 
all-blocking is that
{code:java}
Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, 
w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER) 
/ w3$o0) AS INTEGER) AS EXPR$4])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, 
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
   +- Exchange(distribution=[forward])
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
               +- Exchange(distribution=[forward])
                  +- Sort(orderBy=[b ASC, c ASC])
                     +- Exchange(distribution=[hash[b]])
                        +- OverAggregate(orderBy=[c ASC, a ASC], 
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0, w0$o1])
                           +- Exchange(distribution=[forward])
                              +- Sort(orderBy=[c ASC, a ASC])
                                 +- Exchange(distribution=[forward])
                                    +- OverAggregate(orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
                                       +- Sort(orderBy=[b ASC])
                                          +- Exchange(distribution=[single])
                                             +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 {code}
However, the expected plan is that
{code:java}
Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, 
w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER) 
/ w3$o0) AS INTEGER) AS EXPR$4])
+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, 
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
   +- Exchange(distribution=[forward])
      +- Sort(orderBy=[c ASC, a ASC])
         +- Exchange(distribution=[hash[c]])
            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
               +- Exchange(distribution=[forward])
                  +- Sort(orderBy=[b ASC, c ASC])
                     +- Exchange(distribution=[hash[b]])
                        +- OverAggregate(orderBy=[c ASC, a ASC], 
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0, w0$o1])
                           +- Exchange(distribution=[forward])
                              +- Sort(orderBy=[c ASC, a ASC])
                                 +- Exchange(distribution=[forward])
                                    +- OverAggregate(orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
                                       +- Exchange (distribution=[forward])
                                 +- Sort(orderBy=[b ASC])
                                   +- Exchange(distribution=[single])
                                     +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to