[FLINK-6618] [table] Fix translation of WindowProperties in Table API.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6583fb4c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6583fb4c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6583fb4c Branch: refs/heads/release-1.3 Commit: 6583fb4c1f85d5c4f57a81ec9a6294915ea165ac Parents: 45923ff Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Thu May 18 13:02:24 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu May 18 22:06:00 2017 +0200 ---------------------------------------------------------------------- .../flink/table/plan/ProjectionTranslator.scala | 20 ++++++++++---------- .../GroupWindowStringExpressionTest.scala | 18 +++++++++++------- 2 files changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6583fb4c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index 802768e..69b437a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -334,20 +334,20 @@ object ProjectionTranslator { val l = replaceAggFunctionCall(b.left, tableEnv) val r = replaceAggFunctionCall(b.right, tableEnv) b.makeCopy(Array(l, r)) - // Functions calls case c @ Call(name, args) => val function = tableEnv.getFunctionCatalog.lookupFunction(name, args) - if (function.isInstanceOf[AggFunctionCall] || function.isInstanceOf[Aggregation]) { - function - } else { - val newArgs = - args.map( - (exp: Expression) => - replaceAggFunctionCall(exp, tableEnv)) - c.makeCopy(Array(name, newArgs)) + function match { + case a: AggFunctionCall => a + case a: Aggregation => a + case p: AbstractWindowProperty => p + case _ => + val newArgs = + args.map( + (exp: Expression) => + replaceAggFunctionCall(exp, tableEnv)) + c.makeCopy(Array(name, newArgs)) } - // Scala functions case sfc @ ScalarFunctionCall(clazz, args) => val newArgs: Seq[Expression] = http://git-wip-us.apache.org/repos/asf/flink/blob/6583fb4c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala index d261e36..1cc156e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.java.{Slide => JSlide} import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.utils.TableTestBase -import org.junit.{Assert, Test} +import org.junit.Test class GroupWindowStringExpressionTest extends TableTestBase { @@ -47,7 +47,9 @@ class GroupWindowStringExpressionTest extends TableTestBase { myCountFun('string), 'int.sum, weightAvgFun('long, 'int), - weightAvgFun('int, 'int) * 2) + weightAvgFun('int, 'int) * 2, + 'w.start, + 'w.end) // String / Java API val resJava = t @@ -55,11 +57,13 @@ class GroupWindowStringExpressionTest extends TableTestBase { .groupBy("w, string") .select( "string, " + - "myCountFun(string), " + - "int.sum, " + - "weightAvgFun(long, int), " + - "weightAvgFun(int, int) * 2") + "myCountFun(string), " + + "int.sum, " + + "weightAvgFun(long, int), " + + "weightAvgFun(int, int) * 2, " + + "start(w)," + + "end(w)") - Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan) + verifyTableEquals(resJava, resScala) } }