[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chuncheng wu updated FLINK-26051:
---------------------------------
    Description: 
hello,

   i have 2 sqls. One  sql (No. sql0) is "select xx from ( ROW_NUMBER stament) 
where rn=1" and  the other one (No. sql1) is   "s{color:#505f79}elect ${fields} 
from result where ${filter_conditions}{color}"  . The fields  has one "case 
when" field .The two sql can work well seperately.but if they combine  it 
results the exception as follow . It happen in the occasion when logical plan 
turn into physical plan :

 
{code:java}
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode.
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
    at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
    at 
com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
In the stacktrace above  , rownumber() 's  physical rel which  is 
StreamExecRank In nomal has became StreamExecOverAggregate . The 
StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW which i never add .Oddly,if you remove the "case when" statement in 
sql1 or  remove the  "where" statement in sql1 ,the program  will work well, 
the exception disappear. In the same time,  rownumber() 's  physical rel came 
back to StreamExecRank. Its confusing

 

example code : 
{code:java}
import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.SourceFunction; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.ExplainDetail; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
import org.apache.flink.types.Row; 
import org.junit.jupiter.api.Test; 
import java.sql.Timestamp; 
import static org.apache.flink.table.api.Expressions.$; 

public class BugTest { 
  @Test public void testRowNumber() throws Exception { 

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 

    EnvironmentSettings 
mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
mySetting);

    env.setParallelism(1); 

 //source table ,name is wosOutSowingTaskDetail
    DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, 
Integer, Integer, Integer, String,   String, String>> oriStream = 
env.addSource(new CustomSourceRowNumber()); 

    Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), 
$("task_type"), $("task_mode"),  $("parent_task_no"), $("total_stage_num"), 
$("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), 
$("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), 
$("sowing_task_detail_id")); 

    tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); 


//sql 0,select xx from ( ROW_NUMBER stament) where rn=1
    Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT 
`biz_bill_no`\n" 
       + ",task_type\n" + ",task_mode\n" 
       + ",parent_task_no\n"
       + ",total_stage_num\n"
       + ",current_stage_index\n"
       + ",use_pre_task_owner\n"
       + ",poi_type\n"
       + ",biz_origin_bill_type\n"
       + ",sowing_task_no\n"
      + " FROM (\n" 
            + " SELECT *,\n"  
            + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER 
BY task_type desc) AS rn\n" 
            + " FROM wosOutSowingTaskDetail\n" 
            + " ) tmp\n"
     + " WHERE rn = 1"); 

    System.out.println("SQL 0 Plan: "); 

    
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
 
    
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
 

    tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
wosOutSowingTaskDetailLatest); 

 
//sql 1:select from sql0'result table ,field has a CASE WHEN
    Table resultTable = tableEnv.sqlQuery("SELECT\n" 
               + "biz_bill_no\n" 
               + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN 
parent_task_no\n" 
               + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num 
>= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN 
parent_task_no\n" 
               + " ELSE sowing_task_no END AS parent_task_no_cw\n" 
               + ",parent_task_no, sowing_task_no, task_type, task_mode, 
total_stage_num,    current_stage_index,use_pre_task_owner \n" 
              + "FROM wosOutSowingTaskDetailLatest\n" 
               + "WHERE task_type = 21\n" 
               + "AND task_mode IN (51, 40)\n" 
               + "AND poi_type = 2\n"); 
    System.out.println("SQL 1 Plan: ");

    System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));

    System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); 

    DataStream<Tuple2<Boolean, Row>> resultStream = 
tableEnv.toRetractStream(resultTable, Row.class);

    env.execute(); 
  } 
//user define source 
static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, 
String, String>> { 

    private boolean isRuning = true; 

    @Override public void run(SourceContext<Tuple12<String, Integer, Integer, 
String, Integer, Integer, Integer, Integer, Integer, String, String, String>> 
sourceContext) throws Exception { 

       while (isRuning) { 

         
sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
 
         
sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
  
         Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { 
isRuning = false; 
       } 
  } 
}
{code}
 
{code:java}
SQL 0 Plan: 
== Abstract Syntax Tree ==
LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9])
+- LogicalFilter(condition=[=($12, 1)])
   +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
      +- LogicalTableScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]])

== Optimized Logical Plan ==
Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I,UA,D])
   +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
changelogMode=[I])
      +- DataStreamScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I])


== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: Custom Source
    Stage 7 : Operator
        content : 
SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
 fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
sowing_task_no, dt, sowing_task_detail_id])
        ship_strategy : FORWARD
        Stage 9 : Operator
            content : Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
sowing_task_detail_id])
            ship_strategy : HASH
            Stage 10 : Operator
                content : Calc(select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no])
                ship_strategy : FORWARD
SQL 1 Plan: 
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode. {code}

  was:
hello,

   i have 2 sqls. One sql has rn=1 and  the Subsequent SQL has "case when" and 
"where".it results the exception as follow. It happen in the occasion when 
logical plan turn into physical plan :

 
{code:java}
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode.
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
    at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
    at 
com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
 

 

example code : 
{code:java}
import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.SourceFunction; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.ExplainDetail; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
import org.apache.flink.types.Row; 
import org.junit.jupiter.api.Test; 
import java.sql.Timestamp; 
import static org.apache.flink.table.api.Expressions.$; 

public class BugTest { 
  @Test public void testRowNumber() throws Exception { 

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 

    EnvironmentSettings 
mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
mySetting);

    env.setParallelism(1); 

 //source table ,name is wosOutSowingTaskDetail
    DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, 
Integer, Integer, Integer, String,   String, String>> oriStream = 
env.addSource(new CustomSourceRowNumber()); 

    Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), 
$("task_type"), $("task_mode"),  $("parent_task_no"), $("total_stage_num"), 
$("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), 
$("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), 
$("sowing_task_detail_id")); 

    tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); 


//sql 0
    Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT 
`biz_bill_no`\n" 
       + ",task_type\n" + ",task_mode\n" 
       + ",parent_task_no\n"
       + ",total_stage_num\n"
       + ",current_stage_index\n"
       + ",use_pre_task_owner\n"
       + ",poi_type\n"
       + ",biz_origin_bill_type\n"
       + ",sowing_task_no\n"
      + " FROM (\n" 
            + " SELECT *,\n"  
            + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER 
BY task_type desc) AS rn\n" 
            + " FROM wosOutSowingTaskDetail\n" 
            + " ) tmp\n"
     + " WHERE rn = 1"); 

    System.out.println("SQL 0 Plan: "); 

    
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
 
    
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
 

    tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
wosOutSowingTaskDetailLatest); 

 
//sql 1:select from sql0'result table ,field has a CASE WHEN
    Table resultTable = tableEnv.sqlQuery("SELECT\n" 
               + "biz_bill_no\n" 
               + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN 
parent_task_no\n" 
               + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num 
>= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN 
parent_task_no\n" 
               + " ELSE sowing_task_no END AS parent_task_no_cw\n" 
               + ",parent_task_no, sowing_task_no, task_type, task_mode, 
total_stage_num,    current_stage_index,use_pre_task_owner \n" 
              + "FROM wosOutSowingTaskDetailLatest\n" 
               + "WHERE task_type = 21\n" 
               + "AND task_mode IN (51, 40)\n" 
               + "AND poi_type = 2\n"); 
    System.out.println("SQL 1 Plan: ");

    System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));

    System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); 

    DataStream<Tuple2<Boolean, Row>> resultStream = 
tableEnv.toRetractStream(resultTable, Row.class);

    env.execute(); 
  } 
//user define source 
static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, 
String, String>> { 

    private boolean isRuning = true; 

    @Override public void run(SourceContext<Tuple12<String, Integer, Integer, 
String, Integer, Integer, Integer, Integer, Integer, String, String, String>> 
sourceContext) throws Exception { 

       while (isRuning) { 

         
sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
 
         
sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
  
         Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { 
isRuning = false; 
       } 
  } 
}
{code}
Oddly,if you remove the "case when" statement in sql1 or  remove the  "where" 
statement in sql1 ,the program  will work well, the exception disappear. So I 
check the physical plan of the sql in the situation. why  rownumber() 's  
physical rel is StreamExecOverAggregate ? In nomal is StreamExecRank. actually, 
if you remove the "case when" statement in sql1 or  remove the  "where" 
statement in sql1,  rownumber() 's  physical rel came back to StreamExecRank.
{code:java}
SQL 0 Plan: 
== Abstract Syntax Tree ==
LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9])
+- LogicalFilter(condition=[=($12, 1)])
   +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
      +- LogicalTableScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]])

== Optimized Logical Plan ==
Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I,UA,D])
   +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
changelogMode=[I])
      +- DataStreamScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I])


== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: Custom Source
    Stage 7 : Operator
        content : 
SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
 fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
sowing_task_no, dt, sowing_task_detail_id])
        ship_strategy : FORWARD
        Stage 9 : Operator
            content : Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
sowing_task_detail_id])
            ship_strategy : HASH
            Stage 10 : Operator
                content : Calc(select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no])
                ship_strategy : FORWARD
SQL 1 Plan: 
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode. {code}


> row_number =1 and Subsequent SQL has "case when" and "where" statement : The 
> window can only be ordered in ASCENDING mode
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26051
>                 URL: https://issues.apache.org/jira/browse/FLINK-26051
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.2
>            Reporter: chuncheng wu
>            Priority: Major
>
> hello,
>    i have 2 sqls. One  sql (No. sql0) is "select xx from ( ROW_NUMBER 
> stament) where rn=1" and  the other one (No. sql1) is   
> "s{color:#505f79}elect ${fields} from result where 
> ${filter_conditions}{color}"  . The fields  has one "case when" field .The 
> two sql can work well seperately.but if they combine  it results the 
> exception as follow . It happen in the occasion when logical plan turn into 
> physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal has became StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you remove the "case when" 
> statement in sql1 or  remove the  "where" statement in sql1 ,the program  
> will work well, the exception disappear. In the same time,  rownumber() 's  
> physical rel came back to StreamExecRank. Its confusing
>  
> example code : 
> {code:java}
> import org.apache.flink.api.java.tuple.Tuple12;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream; 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
> import org.apache.flink.streaming.api.functions.source.SourceFunction; 
> import org.apache.flink.table.api.EnvironmentSettings; 
> import org.apache.flink.table.api.ExplainDetail; 
> import org.apache.flink.table.api.Table; 
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
> import org.apache.flink.types.Row; 
> import org.junit.jupiter.api.Test; 
> import java.sql.Timestamp; 
> import static org.apache.flink.table.api.Expressions.$; 
> public class BugTest { 
>   @Test public void testRowNumber() throws Exception { 
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(); 
>     EnvironmentSettings 
> mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> mySetting);
>     env.setParallelism(1); 
>  //source table ,name is wosOutSowingTaskDetail
>     DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, 
> Integer, Integer, Integer, String,   String, String>> oriStream = 
> env.addSource(new CustomSourceRowNumber()); 
>     Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), 
> $("task_type"), $("task_mode"),  $("parent_task_no"), $("total_stage_num"), 
> $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), 
> $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), 
> $("sowing_task_detail_id")); 
>     tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); 
> //sql 0,select xx from ( ROW_NUMBER stament) where rn=1
>     Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT 
> `biz_bill_no`\n" 
>        + ",task_type\n" + ",task_mode\n" 
>        + ",parent_task_no\n"
>        + ",total_stage_num\n"
>        + ",current_stage_index\n"
>        + ",use_pre_task_owner\n"
>        + ",poi_type\n"
>        + ",biz_origin_bill_type\n"
>        + ",sowing_task_no\n"
>       + " FROM (\n" 
>             + " SELECT *,\n"  
>             + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER 
> BY task_type desc) AS rn\n" 
>             + " FROM wosOutSowingTaskDetail\n" 
>             + " ) tmp\n"
>      + " WHERE rn = 1"); 
>     System.out.println("SQL 0 Plan: "); 
>     
> System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
>  
>     
> System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
>  
>     tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
> wosOutSowingTaskDetailLatest); 
>  
> //sql 1:select from sql0'result table ,field has a CASE WHEN
>     Table resultTable = tableEnv.sqlQuery("SELECT\n" 
>                + "biz_bill_no\n" 
>                + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN 
> parent_task_no\n" 
>                + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num 
> >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN 
> parent_task_no\n" 
>                + " ELSE sowing_task_no END AS parent_task_no_cw\n" 
>                + ",parent_task_no, sowing_task_no, task_type, task_mode, 
> total_stage_num,    current_stage_index,use_pre_task_owner \n" 
>               + "FROM wosOutSowingTaskDetailLatest\n" 
>                + "WHERE task_type = 21\n" 
>                + "AND task_mode IN (51, 40)\n" 
>                + "AND poi_type = 2\n"); 
>     System.out.println("SQL 1 Plan: ");
>     System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));
>     System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); 
>     DataStream<Tuple2<Boolean, Row>> resultStream = 
> tableEnv.toRetractStream(resultTable, Row.class);
>     env.execute(); 
>   } 
> //user define source 
> static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
> Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, 
> String, String, String>> { 
>     private boolean isRuning = true; 
>     @Override public void run(SourceContext<Tuple12<String, Integer, Integer, 
> String, Integer, Integer, Integer, Integer, Integer, String, String, String>> 
> sourceContext) throws Exception { 
>        while (isRuning) { 
>          
> sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
>  
>          
> sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
>   
>          Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() 
> { isRuning = false; 
>        } 
>   } 
> }
> {code}
>  
> {code:java}
> SQL 0 Plan: 
> == Abstract Syntax Tree ==
> LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
> sowing_task_no=[$9])
> +- LogicalFilter(condition=[=($12, 1)])
>    +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
> sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
> OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> wosOutSowingTaskDetail]])
> == Optimized Logical Plan ==
> Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
> total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
> biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
> orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
> changelogMode=[I,UA,D])
>    +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
> changelogMode=[I])
>       +- DataStreamScan(table=[[default_catalog, default_database, 
> wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
> changelogMode=[I])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>     content : Source: Custom Source
>     Stage 7 : Operator
>         content : 
> SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
>  fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
> current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
> sowing_task_no, dt, sowing_task_detail_id])
>         ship_strategy : FORWARD
>         Stage 9 : Operator
>             content : Rank(strategy=[AppendFastStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
> sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
> task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
> use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
> sowing_task_detail_id])
>             ship_strategy : HASH
>             Stage 10 : Operator
>                 content : Calc(select=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no])
>                 ship_strategy : FORWARD
> SQL 1 Plan: 
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode. {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to