P Rohan Kumar created FLINK-31165:
-------------------------------------

             Summary: Over Agg: The window rank function without order by error 
in top N query
                 Key: FLINK-31165
                 URL: https://issues.apache.org/jira/browse/FLINK-31165
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.16.0
            Reporter: P Rohan Kumar


 
{code:java}
val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

val tableEnv = StreamTableEnvironment.create(env)


val td = TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
  .option("number-of-rows", "10")
  .schema(Schema
    .newBuilder()
    .column("NAME", DataTypes.VARCHAR(2147483647))
    .column("ROLLNO", DataTypes.DECIMAL(5, 0))
    .column("DOB", DataTypes.DATE())
    .column("CLASS", DataTypes.DECIMAL(2, 0))
    .column("SUBJECT", DataTypes.VARCHAR(2147483647))
    .build())
  .build()

val table = tableEnv.from(td)


tableEnv.createTemporaryView("temp_table", table)

val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as 
date) SRC_NO from temp_table")

tableEnv.createTemporaryView("temp_table2", newTable)


val newTable2 = tableEnv.sqlQuery("select * from (select 
NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  
from temp_table2 a) where rownum <= 1")

tableEnv.toChangelogStream(newTable2).print()

env.execute()
 {code}
 

 

I am getting the below error if I run the above code.

I have already provided an order by column.

If I change the order by column to some other column, such as "SUBJECT", then 
the job runs fine.

 

 
{code:java}
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args 
[rel#245:LogicalWindow.NONE.any.None: 
0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
    at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
    at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
    at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
    at 
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
    at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
    at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
    at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
    at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
    at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
    at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
    at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
    at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
    at org.example.OverAggregateBug.main(OverAggregateBug.scala)
Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The window 
rank function without order by. please re-check the over window statement.
    at 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
    at 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
    at 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
    at 
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
    ... 27 more {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to