[
https://issues.apache.org/jira/browse/FLINK-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040810#comment-16040810
]
ASF GitHub Bot commented on FLINK-6834:
---------------------------------------
Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4070#discussion_r120614729
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
---
@@ -226,30 +226,69 @@ object ProjectionTranslator {
overWindows: Array[OverWindow],
tEnv: TableEnvironment): Seq[Expression] = {
- def resolveOverWindow(unresolvedCall: UnresolvedOverCall): Expression
= {
-
- val overWindow =
overWindows.find(_.alias.equals(unresolvedCall.alias))
- if (overWindow.isDefined) {
- OverCall(
- unresolvedCall.agg,
- overWindow.get.partitionBy,
- overWindow.get.orderBy,
- overWindow.get.preceding,
- overWindow.get.following)
- } else {
- unresolvedCall
- }
- }
+ exprs.map(e => replaceOverCall(e, overWindows, tEnv))
+ }
- val projectList = new ListBuffer[Expression]
- exprs.foreach {
- case Alias(u: UnresolvedOverCall, name, _) =>
- projectList += Alias(resolveOverWindow(u), name)
+ /**
+ * Find and replace UnresolvedOverCall with OverCall
+ *
+ * @param expr the expression to check
+ * @return an expression with correct resolved OverCall
+ */
+ private def replaceOverCall(
+ expr: Expression,
+ overWindows: Array[OverWindow],
+ tableEnv: TableEnvironment): Expression = {
+
+ expr match {
case u: UnresolvedOverCall =>
- projectList += resolveOverWindow(u)
- case e: Expression => projectList += e
+ val overWindow = overWindows.find(_.alias.equals(u.alias))
+ if (overWindow.isDefined) {
+ OverCall(
+ u.agg,
+ overWindow.get.partitionBy,
+ overWindow.get.orderBy,
+ overWindow.get.preceding,
+ overWindow.get.following)
+ } else {
+ u
+ }
+
+ case u: UnaryExpression =>
+ val c = replaceOverCall(u.child, overWindows, tableEnv)
--- End diff --
Ah, I meant the current test case can not execute to this branch, I suggest
add `Abs((countFun('b) over 'w))` into test case SELECT clause. i.e:
.select(
'a, 'b, 'c,
'b.sum over 'w,
"SUM:".toExpr + ('b.sum over 'w),
countFun('b) over 'w,
(countFun('b) over 'w) + 1,
**Abs((countFun('b) over 'w)), -> // cover UnaryExpression case**
plusOne(countFun('b) over 'w),
**array(countFun('c) over 'w, countFun('b) over 'w), -> // cover
ArrayConstructor case**
'b.avg over 'w,
'b.max over 'w,
'b.min over 'w,
weightAvgFun('b, 'a) over 'w)
> Over window doesn't support complex calculation
> -----------------------------------------------
>
> Key: FLINK-6834
> URL: https://issues.apache.org/jira/browse/FLINK-6834
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> The following example
> {code}
> val windowedTable = table
> .window(
> Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
> .select('c, 'b, ('a.count over 'w) + 1)
> {code}
> will throw exception:
> {code}
> org.apache.flink.table.api.ValidationException: Expression
> UnresolvedOverCall(count('a),'w) failed on input check: Over window with
> alias $alias could not be resolved.
> at
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
> at
> org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:89)
> at
> org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
> at
> org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
> at
> org.apache.flink.table.plan.TreeNode$$anonfun$1.apply(TreeNode.scala:46)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)