[
https://issues.apache.org/jira/browse/FLINK-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040814#comment-16040814
]
ASF GitHub Bot commented on FLINK-6834:
---------------------------------------
Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4070#discussion_r120614930
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
---
@@ -226,30 +226,69 @@ object ProjectionTranslator {
overWindows: Array[OverWindow],
tEnv: TableEnvironment): Seq[Expression] = {
- def resolveOverWindow(unresolvedCall: UnresolvedOverCall): Expression
= {
-
- val overWindow =
overWindows.find(_.alias.equals(unresolvedCall.alias))
- if (overWindow.isDefined) {
- OverCall(
- unresolvedCall.agg,
- overWindow.get.partitionBy,
- overWindow.get.orderBy,
- overWindow.get.preceding,
- overWindow.get.following)
- } else {
- unresolvedCall
- }
- }
+ exprs.map(e => replaceOverCall(e, overWindows, tEnv))
+ }
- val projectList = new ListBuffer[Expression]
- exprs.foreach {
- case Alias(u: UnresolvedOverCall, name, _) =>
- projectList += Alias(resolveOverWindow(u), name)
+ /**
+ * Find and replace UnresolvedOverCall with OverCall
+ *
+ * @param expr the expression to check
+ * @return an expression with correct resolved OverCall
+ */
+ private def replaceOverCall(
+ expr: Expression,
+ overWindows: Array[OverWindow],
+ tableEnv: TableEnvironment): Expression = {
+
+ expr match {
case u: UnresolvedOverCall =>
- projectList += resolveOverWindow(u)
- case e: Expression => projectList += e
+ val overWindow = overWindows.find(_.alias.equals(u.alias))
+ if (overWindow.isDefined) {
+ OverCall(
+ u.agg,
+ overWindow.get.partitionBy,
+ overWindow.get.orderBy,
+ overWindow.get.preceding,
+ overWindow.get.following)
+ } else {
+ u
+ }
+
+ case u: UnaryExpression =>
+ val c = replaceOverCall(u.child, overWindows, tableEnv)
+ u.makeCopy(Array(c))
+
+ case b: BinaryExpression =>
+ val l = replaceOverCall(b.left, overWindows, tableEnv)
+ val r = replaceOverCall(b.right, overWindows, tableEnv)
+ b.makeCopy(Array(l, r))
+
+ // Functions calls
+ case c @ Call(name, args: Seq[Expression]) =>
+ val newArgs =
+ args.map(
+ (exp: Expression) =>
+ replaceOverCall(exp, overWindows, tableEnv))
+ c.makeCopy(Array(name, newArgs))
+
+ // Scala functions
+ case sfc @ ScalarFunctionCall(clazz, args: Seq[Expression]) =>
+ val newArgs: Seq[Expression] =
+ args.map(
+ (exp: Expression) =>
+ replaceOverCall(exp, overWindows, tableEnv))
+ sfc.makeCopy(Array(clazz, newArgs))
+
+ // Array constructor
+ case c @ ArrayConstructor(args) =>
--- End diff --
Thanks for the explanation @fhueske :)
> Over window doesn't support complex calculation
> -----------------------------------------------
>
> Key: FLINK-6834
> URL: https://issues.apache.org/jira/browse/FLINK-6834
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> The following example
> {code}
> val windowedTable = table
> .window(
> Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
> .select('c, 'b, ('a.count over 'w) + 1)
> {code}
> will throw exception:
> {code}
> org.apache.flink.table.api.ValidationException: Expression
> UnresolvedOverCall(count('a),'w) failed on input check: Over window with
> alias $alias could not be resolved.
> at
> org.apache.flink.table.plan.logical.LogicalNode.failValidation(LogicalNode.scala:143)
> at
> org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:89)
> at
> org.apache.flink.table.plan.logical.LogicalNode$$anonfun$validate$1.applyOrElse(LogicalNode.scala:83)
> at
> org.apache.flink.table.plan.TreeNode.postOrderTransform(TreeNode.scala:72)
> at
> org.apache.flink.table.plan.TreeNode$$anonfun$1.apply(TreeNode.scala:46)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)