[FLINK-6618] [table] Fix translation of WindowProperties in Table API.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6583fb4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6583fb4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6583fb4c

Branch: refs/heads/release-1.3
Commit: 6583fb4c1f85d5c4f57a81ec9a6294915ea165ac
Parents: 45923ff
Author: sunjincheng121 <sunjincheng...@gmail.com>
Authored: Thu May 18 13:02:24 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu May 18 22:06:00 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/ProjectionTranslator.scala | 20 ++++++++++----------
 .../GroupWindowStringExpressionTest.scala       | 18 +++++++++++-------
 2 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6583fb4c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index 802768e..69b437a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -334,20 +334,20 @@ object ProjectionTranslator {
         val l = replaceAggFunctionCall(b.left, tableEnv)
         val r = replaceAggFunctionCall(b.right, tableEnv)
         b.makeCopy(Array(l, r))
-
       // Functions calls
       case c @ Call(name, args) =>
         val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
-        if (function.isInstanceOf[AggFunctionCall] || 
function.isInstanceOf[Aggregation]) {
-          function
-        } else {
-          val newArgs =
-            args.map(
-            (exp: Expression) =>
-              replaceAggFunctionCall(exp, tableEnv))
-          c.makeCopy(Array(name, newArgs))
+        function match {
+          case a: AggFunctionCall => a
+          case a: Aggregation => a
+          case p: AbstractWindowProperty => p
+          case _ =>
+            val newArgs =
+              args.map(
+                (exp: Expression) =>
+                  replaceAggFunctionCall(exp, tableEnv))
+            c.makeCopy(Array(name, newArgs))
         }
-
       // Scala functions
       case sfc @ ScalarFunctionCall(clazz, args) =>
         val newArgs: Seq[Expression] =

http://git-wip-us.apache.org/repos/asf/flink/blob/6583fb4c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index d261e36..1cc156e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.java.{Slide => JSlide}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import org.apache.flink.table.utils.TableTestBase
-import org.junit.{Assert, Test}
+import org.junit.Test
 
 
 class GroupWindowStringExpressionTest extends TableTestBase {
@@ -47,7 +47,9 @@ class GroupWindowStringExpressionTest extends TableTestBase {
         myCountFun('string),
         'int.sum,
         weightAvgFun('long, 'int),
-        weightAvgFun('int, 'int) * 2)
+        weightAvgFun('int, 'int) * 2,
+        'w.start,
+        'w.end)
 
     // String / Java API
     val resJava = t
@@ -55,11 +57,13 @@ class GroupWindowStringExpressionTest extends TableTestBase 
{
       .groupBy("w, string")
       .select(
         "string, " +
-          "myCountFun(string), " +
-          "int.sum, " +
-          "weightAvgFun(long, int), " +
-          "weightAvgFun(int, int) * 2")
+        "myCountFun(string), " +
+        "int.sum, " +
+        "weightAvgFun(long, int), " +
+        "weightAvgFun(int, int) * 2, " +
+        "start(w)," +
+        "end(w)")
 
-    Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, 
resJava.logicalPlan)
+    verifyTableEquals(resJava, resScala)
   }
 }

Reply via email to