Repository: flink
Updated Branches:
  refs/heads/master baf057a48 -> de03e0cea


http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
new file mode 100644
index 0000000..96fd787
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.{RowtimeAttribute, 
WindowReference}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil.{streamTableNode, term, 
unaryNode}
+import org.junit.{Ignore, Test}
+
+class GroupWindowTest extends TableTestBase {
+
+  // batch windows are not supported yet
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 100.milli as 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowProperty(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .select('string, 'string.start) // property in non windowed table
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+    val util = streamTestUtil()
+    // rowtime attribute must not be a field name
+    util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .select('string, 'int as 'rowtime) // rowtime attribute must not be an 
alias
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an 
alias
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      // only rowtime is a valid time attribute in a stream environment
+      .window(Tumble over 50.milli on 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Tumble over "WRONG") // string is not a valid interval
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSlidingSize(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Slide over "WRONG" every "WRONG") // string is not a valid 
interval
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSlidingSlide(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Slide over 12.rows every 1.minute) // row and time intervals may 
not be mixed
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSessionGap(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 10.rows) // row interval is not valid for 
session windows
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias1(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 100.milli as 1 + 1) // expression instead of a 
symbol
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias2(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 100.milli as 'string) // field name "string" is 
already present
+      .select('string, 'int.count)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 50.milli)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 2.rows)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 
5.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 2.rows on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 
2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 50.milli every 50.milli)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 
50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 2.rows every 1.rows)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 8.milli every 10.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 
8.milli, 10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 2.rows every 1.rows on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 
2.rows, 1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Session withGap 7.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 
7.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 50.milli)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 
5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 
2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 50.milli every 50.milli)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 
50.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 
8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 
2.rows, 1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 
7.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window",
+        EventTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(),
+          5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start(WindowReference(w)) AS TMP_1",
+        "end(WindowReference(w)) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSlideWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window",
+        EventTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(),
+          10.milli,
+          5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start(WindowReference(w)) AS TMP_1",
+        "end(WindowReference(w)) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSessionWindowStartWithTwoEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Session withGap 3.milli on 'rowtime as 'w)
+      .select('w.end, 'string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamAggregate",
+        streamTableNode(0),
+        term("groupBy", "string"),
+        term("window",
+          EventTimeSessionGroupWindow(
+            Some(WindowReference("w")),
+            RowtimeAttribute(),
+            3.milli)),
+        term("select",
+          "string",
+          "COUNT(int) AS TMP_1",
+          "end(WindowReference(w)) AS TMP_0",
+          "start(WindowReference(w)) AS TMP_2",
+          "end(WindowReference(w)) AS TMP_3")
+      ),
+      term("select", "TMP_0", "string", "TMP_1", "TMP_2", "TMP_3")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
index 8ce1472..6d1a62e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -35,14 +35,6 @@ class UnsupportedOpsTest extends 
StreamingMultipleProgramsTestBase {
   }
 
   @Test(expected = classOf[ValidationException])
-  def testGroupBy(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-      .groupBy('_1)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testDistinct(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
index fd43ed4..ce693ff 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.{Table, TableEnvironment}
 import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
-import org.junit.Assert
+import org.junit.Assert.assertEquals
 import org.mockito.Mockito.{mock, when}
 
 /**
@@ -46,6 +46,13 @@ class TableTestBase {
 }
 
 abstract class TableTestUtil {
+
+  private var counter = 0
+
+  def addTable[T: TypeInformation](fields: Expression*): Table = {
+    addTable[T](s"Table${counter += 1}", fields: _*)
+  }
+
   def addTable[T: TypeInformation](name: String, fields: Expression*): Table
   def verifySql(query: String, expected: String): Unit
   def verifyTable(resultTable: Table, expected: String): Unit
@@ -58,18 +65,18 @@ object TableTestUtil {
 
   def unaryNode(node: String, input: String, term: String*): String = {
     s"""$node(${term.mkString(", ")})
-       |  $input
+       |$input
        |""".stripMargin
   }
 
   def binaryNode(node: String, left: String, right: String, term: String*): 
String = {
     s"""$node(${term.mkString(", ")})
-       |  $left
-       |  $right
+       |$left
+       |$right
        |""".stripMargin
   }
 
-  def term(term: String, value: String*): String = {
+  def term(term: AnyRef, value: AnyRef*): String = {
     s"$term=[${value.mkString(", ")}]"
   }
 
@@ -110,7 +117,9 @@ case class BatchTableTestUtil() extends TableTestUtil {
     val relNode = resultTable.getRelNode
     val optimized = tEnv.optimize(relNode)
     val actual = RelOptUtil.toString(optimized)
-    Assert.assertEquals(expected, actual)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
   }
 }
 
@@ -143,6 +152,8 @@ case class StreamTableTestUtil() extends TableTestUtil {
     val relNode = resultTable.getRelNode
     val optimized = tEnv.optimize(relNode)
     val actual = RelOptUtil.toString(optimized)
-    Assert.assertEquals(expected, actual)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
   }
 }

Reply via email to