[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640037#comment-17640037
 ] 

Krzysztof Chmielewski commented on FLINK-26051:
-----------------------------------------------

Hi :) [~qingyue]
Do you have any update on this one? :)

> one sql has row_number =1 and the subsequent SQL has "case when" and "where" 
> statement result Exception : The window can only be ordered in ASCENDING mode
> ----------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26051
>                 URL: https://issues.apache.org/jira/browse/FLINK-26051
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.2, 1.14.4
>            Reporter: chuncheng wu
>            Assignee: Jane Chan
>            Priority: Major
>         Attachments: image-2022-02-10-20-13-14-424.png, 
> image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png
>
>
> hello,
>    i have 2 sqls. One  sql (sql0) is "select xx from ( ROW_NUMBER statment) 
> where rn=1" and  the other one (sql1) is   "s{color:#505f79}elect ${fields} 
> from result where ${filter_conditions}{color}"  . The fields quoted in sql1 
> has one "case when" field .The two sql can work well seperately.but if they 
> combine  it results the exception as follow . It happen in the occasion when 
> logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above  , rownumber() 's  physical rel which  is 
> StreamExecRank In nomal change to StreamExecOverAggregate . The 
> StreamExecOverAggregate rel has a  window= ROWS BETWEEN UNBOUNDED PRECEDING 
> AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field 
> or the  "where" statement in sql1 ,the program  will work well, the exception 
> disappear. In the same time,  rownumber() 's  physical rel change back to 
> StreamExecRank. Its confusing me a lot
>  
> example code : 
> {code:java}
>  @Test
>  public void testRowNumber() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings mySetting = EnvironmentSettings
>                 .newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         env.setParallelism(1);
>         Configuration configuration = tableEnv.getConfig().getConfiguration();
>         DataStream<Tuple12<String, Integer, Integer, String, Integer, 
> Integer, Integer, Integer,
>                 Integer, String, String, String>> oriStream = 
> env.addSource(new CustomSourceRowNumber());
>         Table testTable = tableEnv.fromDataStream(
>                 oriStream,
>                 $("biz_bill_no"),
>                 $("task_type"),
>                 $("task_mode"),
>                 $("parent_task_no"),
>                 $("total_stage_num"),
>                 $("current_stage_index"),
>                 $("use_pre_task_owner"),
>                 $("poi_type"),
>                 $("biz_origin_bill_type"),
>                 $("sowing_task_no"),
>                 $("dt"),
>                 $("sowing_task_detail_id"));
>         tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable);  
>         //sql0 ,select xx from ( ROW_NUMBER stament) where rn=1
>         Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery(
>                 "SELECT `biz_bill_no`\n" +
>                         ",task_type\n" +
>                         ",task_mode\n" +
>                         ",parent_task_no\n" +
>                         ",total_stage_num\n" +
>                         ",current_stage_index\n" +
>                         ",use_pre_task_owner\n" +
>                         ",poi_type\n" +
>                         ",biz_origin_bill_type\n" +
>                         ",sowing_task_no\n" +
>                         " FROM (\n" +
>                         "    SELECT *,\n" +
>                         "        ROW_NUMBER() OVER(PARTITION BY 
> dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n"
>                         +
>                         "     FROM wosOutSowingTaskDetail\n" +
>                         "    ) tmp\n" +
>                         " WHERE rn = 1");
>         tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
> wosOutSowingTaskDetailLatest);  
>          //sql , select ${fields} from result where ${filter_conditions}
>          //oddly,if we remove the "CASE WHEN" field  ```"CASE WHEN task_mode 
> = 51 THEN parent_task_no\n" 
>          //+     " WHEN task_mode = 40 AND total_stage_num >= 2 AND 
> current_stage_index >= 2 AND 
>          //use_pre_task_owner = 1 THEN parent_task_no\n" 
>          //+     " ELSE sowing_task_no END AS parent_task_no_cw\n"```,the 
> program will work well without it.         
>         Table resultTable = tableEnv.sqlQuery("SELECT\n" +
>                 "biz_bill_no\n" +
>                 ", CASE WHEN task_mode = 51 THEN parent_task_no\n" +
>                 "     WHEN task_mode = 40 AND total_stage_num >= 2 AND 
> current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" +
>                 "     ELSE sowing_task_no END AS parent_task_no_cw\n" +
>                 ",parent_task_no"
>                 + ",sowing_task_no, "
>                 + "task_type, task_mode, "
>                 + "total_stage_num, "
>                 + "current_stage_index,"
>                 + "use_pre_task_owner \n" +
>                 "FROM wosOutSowingTaskDetailLatest\n" +
>                 "WHERE task_type = 21\n" +
>                 "AND task_mode IN (51, 40)\n" +
>                 "AND poi_type = 2\n" +
>                 "AND biz_origin_bill_type not in (111,112,113,114)");    
>           
>                System.out.println(resultTable.explain());
>     }   
>  class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
> Integer, Integer, String, Integer, Integer,
>             Integer, Integer,
>             Integer, String, String, String>> {
>         private boolean isRuning = true;        @Override
>         public void run(
>                 SourceContext<Tuple12<String, Integer, Integer, String, 
> Integer, Integer, Integer, Integer,
>                         Integer, String, String, String>> sourceContext) 
> throws Exception {
>             while (isRuning) {                
> sourceContext.collect(Tuple12.of("xxx",
>                         21,
>                         51,
>                         "yyy",
>                         1,
>                         1,
>                         0,
>                         2,
>                         110,
>                         "zzz",
>                         "aaa",
>                         "bbb"));
>                 sourceContext.collect(Tuple12.of("xxx",
>                         21,
>                         40,
>                         "yyy",
>                         2,
>                         2,
>                         1,
>                         2,
>                         110,
>                         "zzz",
>                         "aaa",
>                         "bbb"));
>                 Thread.sleep(Integer.MAX_VALUE);
>             }
>         }        @Override
>         public void cancel() {
>             isRuning = false;
>         }
>     } {code}
> System.out.println
> {code:java}
> SQL 0 Plan: 
> == Abstract Syntax Tree ==
> LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
> sowing_task_no=[$9])
> +- LogicalFilter(condition=[=($12, 1)])
>    +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
> sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
> OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> wosOutSowingTaskDetail]])
> == Optimized Logical Plan ==
> Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
> total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
> biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
> orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
> changelogMode=[I,UA,D])
>    +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
> changelogMode=[I])
>       +- DataStreamScan(table=[[default_catalog, default_database, 
> wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
> changelogMode=[I])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>     content : Source: Custom Source   
> Stage 7 : Operator
>         content : 
> SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
>  fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
> current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
> sowing_task_no, dt, sowing_task_detail_id])
>         ship_strategy : FORWARD        
> Stage 9 : Operator
>             content : Rank(strategy=[AppendFastStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
> sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
> task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
> use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
> sowing_task_detail_id])
>             ship_strategy : HASH            
> Stage 10 : Operator
>                 content : Calc(select=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no])
>                 ship_strategy : FORWARD
> SQL 1 Plan: 
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to