Repository: flink Updated Branches: refs/heads/master 7fe0eb477 -> 438276de8
http://git-wip-us.apache.org/repos/asf/flink/blob/438276de/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala index 627b25b..5ba3e34 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala @@ -51,7 +51,7 @@ abstract class AggFunctionTestBase[T] { // test aggregate functions with partial merge def testAggregateWithMerge(): Unit = { - if (ifMethodExitInFunction("merge", aggregator)) { + if (ifMethodExistInFunction("merge", aggregator)) { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { //equally split the vals sequence into two sequences http://git-wip-us.apache.org/repos/asf/flink/blob/438276de/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala index f13f350..071f0ee 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.runtime.dataset +import java.math.BigDecimal + import org.apache.flink.api.scala._ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ @@ -37,20 +39,22 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) extends TableProgramsClusterTestBase(configMode) { val data = List( - (1L, 1, "Hi"), - (2L, 2, "Hallo"), - (3L, 2, "Hello"), - (6L, 3, "Hello"), - (4L, 5, "Hello"), - (16L, 4, "Hello world"), - (8L, 3, "Hello world")) + (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"), + (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"), + (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), + (6L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), + (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"), + (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world")) @Test(expected = classOf[UnsupportedOperationException]) def testAllEventTimeTumblingWindowOverCount(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) // Count tumbling non-grouping window on event-time are currently not supported table @@ -65,14 +69,20 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table .window(Tumble over 2.rows on 'long as 'w) .groupBy('w, 'string) - .select('string, 'int.sum) + .select('string, 'int.sum, 'int.count, 'int.max, 'int.min, 'int.avg, + 'double.sum, 'double.count, 'double.max, 'double.min, 'double.avg, + 'float.sum, 'float.count, 'float.max, 'float.min, 'float.avg, + 'bigdec.sum, 'bigdec.count, 'bigdec.max, 'bigdec.min, 'bigdec.avg) - val expected = "Hello,7\n" + "Hello world,7\n" + val expected = "Hello,7,2,5,2,3,7.0,2,5.0,2.0,3.5,7.0,2,5.0,2.0,3.5,7,2,5,2,3.5\n" + + "Hello world,7,2,4,3,3,7.0,2,4.0,3.0,3.5,7.0,2,4.0,3.0,3.5,7,2,4,3,3.5\n" val results = windowedTable.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -82,7 +92,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) @@ -105,7 +117,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) @@ -125,7 +139,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table .window(Session withGap 7.milli on 'long as 'w) .groupBy('string, 'w) @@ -146,7 +162,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) // Non-grouping Session window on event-time are currently not supported val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable =table .window(Session withGap 7.milli on 'long as 'w) .groupBy('w) @@ -158,7 +176,9 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) - val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val table = env + .fromCollection(data) + .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) table .window(Tumble over 5.milli on 'long as 'w) .groupBy('w, 'string)
