[ 
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chuncheng wu updated FLINK-26051:
---------------------------------
    Description: 
hello,

   i have 2 sqls. One sql has rn=1 and  the Subsequent SQL has "case when" and 
"where".it results the exception as follow. It happen in the occasion when 
logical plan turn into physical plan :

 
{code:java}
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode.
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
    at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
    at 
com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
 

 

example code : 
{code:java}
import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.SourceFunction; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.ExplainDetail; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
import org.apache.flink.types.Row; 
import org.junit.jupiter.api.Test; 
import java.sql.Timestamp; 
import static org.apache.flink.table.api.Expressions.$; 

public class BugTest { 
  @Test public void testRowNumber() throws Exception { 

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 

    EnvironmentSettings 
mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
mySetting);

    env.setParallelism(1); 

 //source table ,name is wosOutSowingTaskDetail
    DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, 
Integer, Integer, Integer, String,   String, String>> oriStream = 
env.addSource(new CustomSourceRowNumber()); 

    Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), 
$("task_type"), $("task_mode"),  $("parent_task_no"), $("total_stage_num"), 
$("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), 
$("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), 
$("sowing_task_detail_id")); 

    tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); 


//sql 0
    Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT 
`biz_bill_no`\n" 
       + ",task_type\n" + ",task_mode\n" 
       + ",parent_task_no\n"
       + ",total_stage_num\n"
       + ",current_stage_index\n"
       + ",use_pre_task_owner\n"
       + ",poi_type\n"
       + ",biz_origin_bill_type\n"
       + ",sowing_task_no\n"
      + " FROM (\n" 
            + " SELECT *,\n"  
            + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER 
BY task_type desc) AS rn\n" 
            + " FROM wosOutSowingTaskDetail\n" 
            + " ) tmp\n"
     + " WHERE rn = 1"); 

    System.out.println("SQL 0 Plan: "); 

    
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
 
    
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
 

    tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
wosOutSowingTaskDetailLatest); 

 
//sql 1
   Table resultTable = tableEnv.sqlQuery("SELECT\n" 
               + "biz_bill_no\n" 
               + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN 
parent_task_no\n" 
               + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num 
>= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN 
parent_task_no\n" 
               + " ELSE sowing_task_no END AS parent_task_no_cw\n" 
               + ",parent_task_no, sowing_task_no, task_type, task_mode, 
total_stage_num,        current_stage_index,use_pre_task_owner \n" 
              + "FROM wosOutSowingTaskDetailLatest\n" 
             + "WHERE task_type = 21\n" + "AND task_mode IN (51, 40)\n" + "AND 
poi_type = 2\n"); 
    System.out.println("SQL 1 Plan: ");

    System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));

    System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); 

    DataStream<Tuple2<Boolean, Row>> resultStream = 
tableEnv.toRetractStream(resultTable, Row.class);

    env.execute(); 
  } 
static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, 
String, String>> { 

    private boolean isRuning = true; 

    @Override public void run(SourceContext<Tuple12<String, Integer, Integer, 
String, Integer, Integer, Integer, Integer, Integer, String, String, String>> 
sourceContext) throws Exception { 

       while (isRuning) { 

         
sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
 
         
sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
  
         Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { 
isRuning = false; 
       } 
  } 
}
{code}
 

 

the logical plan in  System.out.println is
{code:java}
SQL 0 Plan: 
== Abstract Syntax Tree ==
LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9])
+- LogicalFilter(condition=[=($12, 1)])
   +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
      +- LogicalTableScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]])

== Optimized Logical Plan ==
Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I,UA,D])
   +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
changelogMode=[I])
      +- DataStreamScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I])


== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: Custom Source
    Stage 7 : Operator
        content : 
SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
 fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
sowing_task_no, dt, sowing_task_detail_id])
        ship_strategy : FORWARD
        Stage 9 : Operator
            content : Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
sowing_task_detail_id])
            ship_strategy : HASH
            Stage 10 : Operator
                content : Calc(select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no])
                ship_strategy : FORWARD
SQL 1 Plan: 
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode. {code}

  was:
hello,

   i have 2 sqls. One sql has rn=1 and  the Subsequent SQL has "case when" and 
"where".it results the exception as follow. It happen in the occasion when 
logical plan turn into physical plan :

 
{code:java}
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode.
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
    at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
    at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
    at 
com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
 

 

example code : 
{code:java}
import org.apache.flink.api.java.tuple.Tuple12;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.functions.source.SourceFunction; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.ExplainDetail; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
import org.apache.flink.types.Row; 
import org.junit.jupiter.api.Test; 
import java.sql.Timestamp; 
import static org.apache.flink.table.api.Expressions.$; 
public class BugTest { 
@Test public void testRowNumber() throws Exception { 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
EnvironmentSettings 
mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting);
env.setParallelism(1); 
DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, 
Integer, Integer, String, String, String>> oriStream = env.addSource(new 
CustomSourceRowNumber()); 
Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), 
$("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"), 
$("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), 
$("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), 
$("sowing_task_detail_id")); 
tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); 
//sql 0
Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT 
`biz_bill_no`\n" 
       + ",task_type\n" + ",task_mode\n" 
       + ",parent_task_no\n"
       + ",total_stage_num\n"
       + ",current_stage_index\n"
       + ",use_pre_task_owner\n"
       + ",poi_type\n"
       + ",biz_origin_bill_type\n"
       + ",sowing_task_no\n"
      + " FROM (\n" 
            + " SELECT *,\n"  
            + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER 
BY task_type desc) AS rn\n" 
            + " FROM wosOutSowingTaskDetail\n" 
            + " ) tmp\n"
     + " WHERE rn = 1"); 
System.out.println("SQL 0 Plan: "); 
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
 
System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
 
tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
wosOutSowingTaskDetailLatest); 
DataStream<Tuple2<Boolean, Row>> retractStream = 
tableEnv.toRetractStream(wosOutSowingTaskDetailLatest, Row.class); 
//sql 1
Table resultTable = tableEnv.sqlQuery("SELECT\n" 
               + "biz_bill_no\n" 
               + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN 
parent_task_no\n" 
               + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num 
>= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN 
parent_task_no\n" 
               + " ELSE sowing_task_no END AS parent_task_no_cw\n" 
               + ",parent_task_no, sowing_task_no, task_type, task_mode, 
total_stage_num,        current_stage_index,use_pre_task_owner \n" 
              + "FROM wosOutSowingTaskDetailLatest\n" 
             + "WHERE task_type = 21\n" + "AND task_mode IN (51, 40)\n" + "AND 
poi_type = 2\n"); 
System.out.println("SQL 1 Plan: "); 
System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));
System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); 
DataStream<Tuple2<Boolean, Row>> resultStream = 
tableEnv.toRetractStream(resultTable, Row.class);
env.execute(); 
} 
static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, 
String, String>> { 
private boolean isRuning = true; 
@Override public void run(SourceContext<Tuple12<String, Integer, Integer, 
String, Integer, Integer, Integer, Integer, Integer, String, String, String>> 
sourceContext) throws Exception { 
while (isRuning) { 
sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
 
sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
 Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { isRuning 
= false; 
} 
} 
}
{code}
 

 

the logical plan in  System.out.println is
{code:java}
SQL 0 Plan: 
== Abstract Syntax Tree ==
LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9])
+- LogicalFilter(condition=[=($12, 1)])
   +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
      +- LogicalTableScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]])

== Optimized Logical Plan ==
Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I,UA,D])
   +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
changelogMode=[I])
      +- DataStreamScan(table=[[default_catalog, default_database, 
wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
changelogMode=[I])


== Physical Execution Plan ==
Stage 1 : Data Source
    content : Source: Custom Source
    Stage 7 : Operator
        content : 
SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
 fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
sowing_task_no, dt, sowing_task_detail_id])
        ship_strategy : FORWARD
        Stage 9 : Operator
            content : Rank(strategy=[AppendFastStrategy], 
rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
sowing_task_detail_id])
            ship_strategy : HASH
            Stage 10 : Operator
                content : Calc(select=[biz_bill_no, task_type, task_mode, 
parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
poi_type, biz_origin_bill_type, sowing_task_no])
                ship_strategy : FORWARD
SQL 1 Plan: 
org.apache.flink.table.api.TableException: The window can only be ordered in 
ASCENDING mode. {code}


> row_number =1 and Subsequent SQL has "case when" and "where" statement : The 
> window can only be ordered in ASCENDING mode
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26051
>                 URL: https://issues.apache.org/jira/browse/FLINK-26051
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.2
>            Reporter: chuncheng wu
>            Priority: Major
>
> hello,
>    i have 2 sqls. One sql has rn=1 and  the Subsequent SQL has "case when" 
> and "where".it results the exception as follow. It happen in the occasion 
> when logical plan turn into physical plan :
>  
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode.
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>     at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
>     at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>     at 
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>     at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
>  
>  
> example code : 
> {code:java}
> import org.apache.flink.api.java.tuple.Tuple12;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream; 
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
> import org.apache.flink.streaming.api.functions.source.SourceFunction; 
> import org.apache.flink.table.api.EnvironmentSettings; 
> import org.apache.flink.table.api.ExplainDetail; 
> import org.apache.flink.table.api.Table; 
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
> import org.apache.flink.types.Row; 
> import org.junit.jupiter.api.Test; 
> import java.sql.Timestamp; 
> import static org.apache.flink.table.api.Expressions.$; 
> public class BugTest { 
>   @Test public void testRowNumber() throws Exception { 
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(); 
>     EnvironmentSettings 
> mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> mySetting);
>     env.setParallelism(1); 
>  //source table ,name is wosOutSowingTaskDetail
>     DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, 
> Integer, Integer, Integer, String,   String, String>> oriStream = 
> env.addSource(new CustomSourceRowNumber()); 
>     Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), 
> $("task_type"), $("task_mode"),  $("parent_task_no"), $("total_stage_num"), 
> $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), 
> $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), 
> $("sowing_task_detail_id")); 
>     tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); 
> //sql 0
>     Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT 
> `biz_bill_no`\n" 
>        + ",task_type\n" + ",task_mode\n" 
>        + ",parent_task_no\n"
>        + ",total_stage_num\n"
>        + ",current_stage_index\n"
>        + ",use_pre_task_owner\n"
>        + ",poi_type\n"
>        + ",biz_origin_bill_type\n"
>        + ",sowing_task_no\n"
>       + " FROM (\n" 
>             + " SELECT *,\n"  
>             + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER 
> BY task_type desc) AS rn\n" 
>             + " FROM wosOutSowingTaskDetail\n" 
>             + " ) tmp\n"
>      + " WHERE rn = 1"); 
>     System.out.println("SQL 0 Plan: "); 
>     
> System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
>  
>     
> System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
>  
>     tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", 
> wosOutSowingTaskDetailLatest); 
>  
> //sql 1
>    Table resultTable = tableEnv.sqlQuery("SELECT\n" 
>                + "biz_bill_no\n" 
>                + ", CASE WHEN task_type = 21 AND task_mode = 51 THEN 
> parent_task_no\n" 
>                + " WHEN task_type = 21 AND task_mode = 40 AND total_stage_num 
> >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN 
> parent_task_no\n" 
>                + " ELSE sowing_task_no END AS parent_task_no_cw\n" 
>                + ",parent_task_no, sowing_task_no, task_type, task_mode, 
> total_stage_num,        current_stage_index,use_pre_task_owner \n" 
>               + "FROM wosOutSowingTaskDetailLatest\n" 
>              + "WHERE task_type = 21\n" + "AND task_mode IN (51, 40)\n" + 
> "AND poi_type = 2\n"); 
>     System.out.println("SQL 1 Plan: ");
>     System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));
>     System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); 
>     DataStream<Tuple2<Boolean, Row>> resultStream = 
> tableEnv.toRetractStream(resultTable, Row.class);
>     env.execute(); 
>   } 
> static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, 
> Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, 
> String, String, String>> { 
>     private boolean isRuning = true; 
>     @Override public void run(SourceContext<Tuple12<String, Integer, Integer, 
> String, Integer, Integer, Integer, Integer, Integer, String, String, String>> 
> sourceContext) throws Exception { 
>        while (isRuning) { 
>          
> sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
>  
>          
> sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
>   
>          Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() 
> { isRuning = false; 
>        } 
>   } 
> }
> {code}
>  
>  
> the logical plan in  System.out.println is
> {code:java}
> SQL 0 Plan: 
> == Abstract Syntax Tree ==
> LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
> sowing_task_no=[$9])
> +- LogicalFilter(condition=[=($12, 1)])
>    +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], 
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], 
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], 
> sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() 
> OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> wosOutSowingTaskDetail]])
> == Optimized Logical Plan ==
> Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, 
> total_stage_num, current_stage_index, use_pre_task_owner, poi_type, 
> biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], 
> orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
> changelogMode=[I,UA,D])
>    +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], 
> changelogMode=[I])
>       +- DataStreamScan(table=[[default_catalog, default_database, 
> wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], 
> changelogMode=[I])
> == Physical Execution Plan ==
> Stage 1 : Data Source
>     content : Source: Custom Source
>     Stage 7 : Operator
>         content : 
> SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
>  fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, 
> current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, 
> sowing_task_no, dt, sowing_task_detail_id])
>         ship_strategy : FORWARD
>         Stage 9 : Operator
>             content : Rank(strategy=[AppendFastStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, 
> sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, 
> task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, 
> use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, 
> sowing_task_detail_id])
>             ship_strategy : HASH
>             Stage 10 : Operator
>                 content : Calc(select=[biz_bill_no, task_type, task_mode, 
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, 
> poi_type, biz_origin_bill_type, sowing_task_no])
>                 ship_strategy : FORWARD
> SQL 1 Plan: 
> org.apache.flink.table.api.TableException: The window can only be ordered in 
> ASCENDING mode. {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to