[ 
https://issues.apache.org/jira/browse/FLINK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-23623:
-------------------------------------
    Fix Version/s: 2.0.0

> Over expression should allow to order by any fields in batch mode
> -----------------------------------------------------------------
>
>                 Key: FLINK-23623
>                 URL: https://issues.apache.org/jira/browse/FLINK-23623
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API
>    Affects Versions: 1.13.1
>            Reporter: Shengkai Fang
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Please add test in the {{OverWindowStringExpressionTest}}
> {code:java}
>  @Test
>   def testRange(): Unit = {
>     val util = batchTestUtil()
> //    val t = util.addDataStream[(Long, Int, String, Int, Long)](
> //      "T1",'a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
>     util.tableEnv.executeSql(
>       """
>         |CREATE TABLE T1 (
>         |  a int,
>         |  b int,
>         |  c int,
>         |  d int,
>         |  e int,
>         |  rowtime TIMESTAMP(3),
>         |  watermark for rowtime as rowtime
>         |) WITH (
>         |  'connector' = 'values'
>         |)
>         |""".stripMargin
>     )
>     util.addTemporarySystemFunction("weightAvgFun", classOf[WeightedAvg])
>     val t = util.tableEnv.from("T1")
>     val resScala = t
>       .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE 
> as 'w)
>       .select('a, 'b.sum over 'w, call("weightAvgFun", 'a, 'b) over 'w as 
> 'myCnt)
>     
>   }
> {code}
> The exception stack as follows
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Ordering must be defined on a time attribute.
> at 
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
> at 
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
> at java.util.Optional.orElseGet(Optional.java:267)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at 
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at 
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at 
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at 
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at 
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
> at 
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at 
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
> at 
> org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
> at 
> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
> at 
> org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
> at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
> {code}
> The main reason why this fails is because {{OverCall}} will check whether the 
> order by key is rowtime during the {{validateInput}}. However, it doesn't 
> have rowtime in batch mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to