[
https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhangbinzaifendou updated FLINK-26051:
--------------------------------------
Attachment: image-2022-02-10-20-13-14-424.png
> one sql has row_number =1 and the subsequent SQL has "case when" and "where"
> statement result Exception : The window can only be ordered in ASCENDING mode
> ----------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-26051
> URL: https://issues.apache.org/jira/browse/FLINK-26051
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.2
> Reporter: chuncheng wu
> Priority: Major
> Attachments: image-2022-02-10-20-13-14-424.png
>
>
> hello,
> i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER stament)
> where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields}
> from result where ${filter_conditions}{color}" . The fields quoted in sql1
> has one "case when" field .The two sql can work well seperately.but if they
> combine it results the exception as follow . It happen in the occasion when
> logical plan turn into physical plan :
>
> {code:java}
> org.apache.flink.table.api.TableException: The window can only be ordered in
> ASCENDING mode.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630)
> at
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
> at
> com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> at
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> at
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code}
> In the stacktrace above , rownumber() 's physical rel which is
> StreamExecRank In nomal change to StreamExecOverAggregate . The
> StreamExecOverAggregate rel has a window= ROWS BETWEEN UNBOUNDED PRECEDING
> AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field
> or the "where" statement in sql1 ,the program will work well, the exception
> disappear. In the same time, rownumber() 's physical rel change back to
> StreamExecRank. Its confusing me a lot
>
> example code :
> {code:java}
> import org.apache.flink.api.java.tuple.Tuple12;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.ExplainDetail;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> import org.junit.jupiter.api.Test;
> import java.sql.Timestamp;
> import static org.apache.flink.table.api.Expressions.$;
> public class BugTest {
> @Test public void testRowNumber() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings
> mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> mySetting);
> env.setParallelism(1);
> //source table ,name is wosOutSowingTaskDetail
> DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer,
> Integer, Integer, Integer, String, String, String>> oriStream =
> env.addSource(new CustomSourceRowNumber());
> Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"),
> $("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"),
> $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"),
> $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"),
> $("sowing_task_detail_id"));
> tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable);
> //sql 0,select xx from ( ROW_NUMBER stament) where rn=1
> Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT
> `biz_bill_no`\n"
> + ",task_type\n"
> + ",task_mode\n"
> + ",parent_task_no\n"
> + " FROM (\n"
> + " SELECT *,\n"
> + " ROW_NUMBER() OVER(PARTITION BY sowing_task_detail_id ORDER BY
> task_type desc) AS rn\n"
> + " FROM wosOutSowingTaskDetail\n"
> + " ) tmp\n"
> + " WHERE rn = 1");
> System.out.println("SQL 0 Plan: ");
>
> System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST));
>
>
> System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE));
>
> tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest",
> wosOutSowingTaskDetailLatest);
>
> //sql 1:select from sql0'result table ,field has a CASE WHEN field
> Table resultTable = tableEnv.sqlQuery("SELECT\n"
> + "biz_bill_no,\n"
> + "CASE WHEN task_type = 21 THEN parent_task_no\n"
> + " ELSE null END AS parent_task_no_cw\n"
> + "FROM wosOutSowingTaskDetailLatest\n"
> + "WHERE task_type = 21\n"
> + "AND task_mode IN (51, 40)\n");
> // this sql remove CASE WHEN field can work well:
> // Table resultTable = tableEnv.sqlQuery("SELECT\n"
> // + "biz_bill_no\n"
> // + "FROM wosOutSowingTaskDetailLatest\n"
> // + "WHERE task_type = 21\n"
> // + "AND task_mode IN (51, 40)\n");
> System.out.println("SQL 1 Plan: ");
> System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST));
> System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE));
> DataStream<Tuple2<Boolean, Row>> resultStream =
> tableEnv.toRetractStream(resultTable, Row.class);
> env.execute();
> }
> //user define source ,just for test,no use here
> static class CustomSourceRowNumber implements SourceFunction<Tuple12<String,
> Integer, Integer, String, Integer, Integer, Integer, Integer, Integer,
> String, String, String>> {
> private boolean isRuning = true;
> @Override public void run(SourceContext<Tuple12<String, Integer, Integer,
> String, Integer, Integer, Integer, Integer, Integer, String, String, String>>
> sourceContext) throws Exception {
> while (isRuning) {
>
> sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb"));
>
>
> sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb"));
>
> Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel()
> { isRuning = false;
> }
> }
> }
> {code}
> System.out.println
> {code:java}
> SQL 0 Plan:
> == Abstract Syntax Tree ==
> LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2],
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5],
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8],
> sowing_task_no=[$9])
> +- LogicalFilter(condition=[=($12, 1)])
> +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2],
> parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5],
> use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8],
> sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER()
> OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)])
> +- LogicalTableScan(table=[[default_catalog, default_database,
> wosOutSowingTaskDetail]])
> == Optimized Logical Plan ==
> Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no,
> total_stage_num, current_stage_index, use_pre_task_owner, poi_type,
> biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id],
> orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode,
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner,
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id],
> changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[dt, sowing_task_detail_id]],
> changelogMode=[I])
> +- DataStreamScan(table=[[default_catalog, default_database,
> wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode,
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner,
> poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id],
> changelogMode=[I])
> == Physical Execution Plan ==
> Stage 1 : Data Source
> content : Source: Custom Source
> Stage 7 : Operator
> content :
> SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail],
> fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num,
> current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type,
> sowing_task_no, dt, sowing_task_detail_id])
> ship_strategy : FORWARD
> Stage 9 : Operator
> content : Rank(strategy=[AppendFastStrategy],
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt,
> sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no,
> task_type, task_mode, parent_task_no, total_stage_num, current_stage_index,
> use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt,
> sowing_task_detail_id])
> ship_strategy : HASH
> Stage 10 : Operator
> content : Calc(select=[biz_bill_no, task_type, task_mode,
> parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner,
> poi_type, biz_origin_bill_type, sowing_task_no])
> ship_strategy : FORWARD
> SQL 1 Plan:
> org.apache.flink.table.api.TableException: The window can only be ordered in
> ASCENDING mode. {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)