[FLINK-6618] [table] Fix translation of WindowProperties in Table API. This closes #3936.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41aa98a2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41aa98a2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41aa98a2 Branch: refs/heads/master Commit: 41aa98a2189ea86ba9464fb0348463af822e332d Parents: c995ebd Author: sunjincheng121 <[email protected]> Authored: Thu May 18 13:02:24 2017 +0800 Committer: Fabian Hueske <[email protected]> Committed: Thu May 18 21:22:12 2017 +0200 ---------------------------------------------------------------------- .../flink/table/plan/ProjectionTranslator.scala | 20 ++++++++++---------- .../GroupWindowStringExpressionTest.scala | 18 +++++++++++------- 2 files changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/41aa98a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index 802768e..69b437a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -334,20 +334,20 @@ object ProjectionTranslator { val l = replaceAggFunctionCall(b.left, tableEnv) val r = replaceAggFunctionCall(b.right, tableEnv) b.makeCopy(Array(l, r)) - // Functions calls case c @ Call(name, args) => val function = tableEnv.getFunctionCatalog.lookupFunction(name, args) - if (function.isInstanceOf[AggFunctionCall] || function.isInstanceOf[Aggregation]) { - function - } else { - val newArgs = - args.map( - (exp: Expression) => - replaceAggFunctionCall(exp, tableEnv)) - c.makeCopy(Array(name, newArgs)) + function match { + case a: AggFunctionCall => a + case a: Aggregation => a + case p: AbstractWindowProperty => p + case _ => + val newArgs = + args.map( + (exp: Expression) => + replaceAggFunctionCall(exp, tableEnv)) + c.makeCopy(Array(name, newArgs)) } - // Scala functions case sfc @ ScalarFunctionCall(clazz, args) => val newArgs: Seq[Expression] = http://git-wip-us.apache.org/repos/asf/flink/blob/41aa98a2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala index d261e36..1cc156e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.java.{Slide => JSlide} import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.utils.TableTestBase -import org.junit.{Assert, Test} +import org.junit.Test class GroupWindowStringExpressionTest extends TableTestBase { @@ -47,7 +47,9 @@ class GroupWindowStringExpressionTest extends TableTestBase { myCountFun('string), 'int.sum, weightAvgFun('long, 'int), - weightAvgFun('int, 'int) * 2) + weightAvgFun('int, 'int) * 2, + 'w.start, + 'w.end) // String / Java API val resJava = t @@ -55,11 +57,13 @@ class GroupWindowStringExpressionTest extends TableTestBase { .groupBy("w, string") .select( "string, " + - "myCountFun(string), " + - "int.sum, " + - "weightAvgFun(long, int), " + - "weightAvgFun(int, int) * 2") + "myCountFun(string), " + + "int.sum, " + + "weightAvgFun(long, int), " + + "weightAvgFun(int, int) * 2, " + + "start(w)," + + "end(w)") - Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan) + verifyTableEquals(resJava, resScala) } }
