[
https://issues.apache.org/jira/browse/FLINK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dawid Wysakowicz updated FLINK-23623:
-------------------------------------
Fix Version/s: 2.0.0
> Over expression should allow to order by any fields in batch mode
> -----------------------------------------------------------------
>
> Key: FLINK-23623
> URL: https://issues.apache.org/jira/browse/FLINK-23623
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / API
> Affects Versions: 1.13.1
> Reporter: Shengkai Fang
> Priority: Major
> Fix For: 2.0.0
>
>
> Please add test in the {{OverWindowStringExpressionTest}}
> {code:java}
> @Test
> def testRange(): Unit = {
> val util = batchTestUtil()
> // val t = util.addDataStream[(Long, Int, String, Int, Long)](
> // "T1",'a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
> util.tableEnv.executeSql(
> """
> |CREATE TABLE T1 (
> | a int,
> | b int,
> | c int,
> | d int,
> | e int,
> | rowtime TIMESTAMP(3),
> | watermark for rowtime as rowtime
> |) WITH (
> | 'connector' = 'values'
> |)
> |""".stripMargin
> )
> util.addTemporarySystemFunction("weightAvgFun", classOf[WeightedAvg])
> val t = util.tableEnv.from("T1")
> val resScala = t
> .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE
> as 'w)
> .select('a, 'b.sum over 'w, call("weightAvgFun", 'a, 'b) over 'w as
> 'myCnt)
>
> }
> {code}
> The exception stack as follows
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:111)
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:69)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:249)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:160)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:160)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:143)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:94)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:126)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:88)
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:89)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:235)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:198)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:183)
> at
> org.apache.flink.table.api.internal.TableImpl$OverWindowedTableImpl.select(TableImpl.java:956)
> at com.rk.linkdata.flink.BatchFlinkTask.main(BatchFlinkTask.java:44)
> {code}
> The main reason why this fails is because {{OverCall}} will check whether the
> order by key is rowtime during the {{validateInput}}. However, it doesn't
> have rowtime in batch mode.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)