Shengkai Fang created FLINK-23623: ------------------------------------- Summary: Over expression should allow to order by any fields in batch mode Key: FLINK-23623 URL: https://issues.apache.org/jira/browse/FLINK-23623 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.13.1 Reporter: Shengkai Fang
Please add test in the {{OverWindowStringExpressionTest}} {code:java} @Test def testRange(): Unit = { val util = batchTestUtil() // val t = util.addDataStream[(Long, Int, String, Int, Long)]( // "T1",'a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) util.tableEnv.executeSql( """ |CREATE TABLE T1 ( | a int, | b int, | c int, | d int, | e int, | rowtime TIMESTAMP(3), | watermark for rowtime as rowtime |) WITH ( | 'connector' = 'values' |) |""".stripMargin ) util.addTemporarySystemFunction("weightAvgFun", classOf[WeightedAvg]) val t = util.tableEnv.from("T1") val resScala = t .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) .select('a, 'b.sum over 'w, call("weightAvgFun", 'a, 'b) over 'w as 'myCnt) } {code} The exception stack as follows {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111) at org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94) at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89) at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235) at java.util.function.Function.lambda$andThen$1(Function.java:88) at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198) at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194) at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183) at org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956) at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44) {code} The main reason why this fails is because {{OverCall}} will check whether the order by key is rowtime during the {{validateInput}}. However, it doesn't have rowtime in batch mode. -- This message was sent by Atlassian Jira (v8.3.4#803005)