Repository: flink Updated Branches: refs/heads/master baf057a48 -> de03e0cea
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala new file mode 100644 index 0000000..96fd787 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table._ +import org.apache.flink.api.table.expressions.{RowtimeAttribute, WindowReference} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.utils.TableTestBase +import org.apache.flink.api.table.utils.TableTestUtil.{streamTableNode, term, unaryNode} +import org.junit.{Ignore, Test} + +class GroupWindowTest extends TableTestBase { + + // batch windows are not supported yet + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { + val util = batchTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 100.milli as 'string) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidWindowProperty(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .select('string, 'string.start) // property in non windowed table + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { + val util = streamTestUtil() + // rowtime attribute must not be a field name + util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .select('string, 'int as 'rowtime) // rowtime attribute must not be an alias + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an alias + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + // only rowtime is a valid time attribute in a stream environment + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Tumble over "WRONG") // string is not a valid interval + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidSlidingSize(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Slide over "WRONG" every "WRONG") // string is not a valid interval + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidSlidingSlide(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidSessionGap(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 10.rows) // row interval is not valid for session windows + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidWindowAlias1(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidWindowAlias2(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + table + .groupBy('string) + .window(Session withGap 100.milli as 'string) // field name "string" is already present + .select('string, 'int.count) + } + + @Test + def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 2.rows) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testEventTimeTumblingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 5.milli on 'rowtime) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + @Ignore // see comments in DataStreamAggregate + def testEventTimeTumblingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 2.rows on 'rowtime) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testProcessingTimeSlidingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 50.milli every 50.milli) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 2.rows every 1.rows) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testEventTimeSlidingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 8.milli every 10.milli on 'rowtime) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + @Ignore // see comments in DataStreamAggregate + def testEventTimeSlidingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 2.rows every 1.rows on 'rowtime) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testEventTimeSessionGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'rowtime) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.rows) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testAllEventTimeTumblingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Tumble over 5.milli on 'rowtime) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + @Ignore // see comments in DataStreamAggregate + def testAllEventTimeTumblingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Tumble over 2.rows on 'rowtime) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + + @Test + def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Slide over 50.milli every 50.milli) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Slide over 2.rows every 1.rows) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Slide over 8.milli every 10.milli on 'rowtime) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + @Ignore // see comments in DataStreamAggregate + def testAllEventTimeSlidingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Slide over 2.rows every 1.rows on 'rowtime) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testAllEventTimeSessionGroupWindowOverTime(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .window(Session withGap 7.milli on 'rowtime) + .select('int.count) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testTumbleWindowStartEnd(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Tumble over 5.milli on 'rowtime as 'w) + .select('string, 'int.count, 'w.start, 'w.end) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", + EventTimeTumblingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), + 5.milli)), + term("select", + "string", + "COUNT(int) AS TMP_0", + "start(WindowReference(w)) AS TMP_1", + "end(WindowReference(w)) AS TMP_2") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testSlideWindowStartEnd(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) + .select('string, 'int.count, 'w.start, 'w.end) + + val expected = unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", + EventTimeSlidingGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), + 10.milli, + 5.milli)), + term("select", + "string", + "COUNT(int) AS TMP_0", + "start(WindowReference(w)) AS TMP_1", + "end(WindowReference(w)) AS TMP_2") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + def testSessionWindowStartWithTwoEnd(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + + val windowedTable = table + .groupBy('string) + .window(Session withGap 3.milli on 'rowtime as 'w) + .select('w.end, 'string, 'int.count, 'w.start, 'w.end) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + streamTableNode(0), + term("groupBy", "string"), + term("window", + EventTimeSessionGroupWindow( + Some(WindowReference("w")), + RowtimeAttribute(), + 3.milli)), + term("select", + "string", + "COUNT(int) AS TMP_1", + "end(WindowReference(w)) AS TMP_0", + "start(WindowReference(w)) AS TMP_2", + "end(WindowReference(w)) AS TMP_3") + ), + term("select", "TMP_0", "string", "TMP_1", "TMP_2", "TMP_3") + ) + + util.verifyTable(windowedTable, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala index 8ce1472..6d1a62e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala @@ -35,14 +35,6 @@ class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { } @Test(expected = classOf[ValidationException]) - def testGroupBy(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) - .groupBy('_1) - } - - @Test(expected = classOf[ValidationException]) def testDistinct(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala index fd43ed4..ce693ff 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.{Table, TableEnvironment} import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.junit.Assert +import org.junit.Assert.assertEquals import org.mockito.Mockito.{mock, when} /** @@ -46,6 +46,13 @@ class TableTestBase { } abstract class TableTestUtil { + + private var counter = 0 + + def addTable[T: TypeInformation](fields: Expression*): Table = { + addTable[T](s"Table${counter += 1}", fields: _*) + } + def addTable[T: TypeInformation](name: String, fields: Expression*): Table def verifySql(query: String, expected: String): Unit def verifyTable(resultTable: Table, expected: String): Unit @@ -58,18 +65,18 @@ object TableTestUtil { def unaryNode(node: String, input: String, term: String*): String = { s"""$node(${term.mkString(", ")}) - | $input + |$input |""".stripMargin } def binaryNode(node: String, left: String, right: String, term: String*): String = { s"""$node(${term.mkString(", ")}) - | $left - | $right + |$left + |$right |""".stripMargin } - def term(term: String, value: String*): String = { + def term(term: AnyRef, value: AnyRef*): String = { s"$term=[${value.mkString(", ")}]" } @@ -110,7 +117,9 @@ case class BatchTableTestUtil() extends TableTestUtil { val relNode = resultTable.getRelNode val optimized = tEnv.optimize(relNode) val actual = RelOptUtil.toString(optimized) - Assert.assertEquals(expected, actual) + assertEquals( + expected.split("\n").map(_.trim).mkString("\n"), + actual.split("\n").map(_.trim).mkString("\n")) } } @@ -143,6 +152,8 @@ case class StreamTableTestUtil() extends TableTestUtil { val relNode = resultTable.getRelNode val optimized = tEnv.optimize(relNode) val actual = RelOptUtil.toString(optimized) - Assert.assertEquals(expected, actual) + assertEquals( + expected.split("\n").map(_.trim).mkString("\n"), + actual.split("\n").map(_.trim).mkString("\n")) } }
