http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala index 6321e09..4b88bc3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala @@ -131,7 +131,9 @@ class TableSourceTest extends TableTestBase { val expected = unaryNode( "DataSetCalc", - batchSourceTableNode(tableName, Array("first")), + s"BatchTableSourceScan(table=[[$tableName]], " + + s"fields=[], " + + s"source=[CsvTableSource(read fields: first)])", term("select", "1 AS _c0") ) @@ -153,9 +155,7 @@ class TableSourceTest extends TableTestBase { val expected = unaryNode( "DataSetCalc", - batchSourceTableNode( - tableName, - Array("name", "id", "amount", "price")), + "BatchTableSourceScan(table=[[filterableTable]], fields=[price, id, amount])", term("select", "price", "id", "amount"), term("where", "<(*(price, 2), 32)") ) @@ -180,7 +180,7 @@ class TableSourceTest extends TableTestBase { "DataSetCalc", batchFilterableSourceTableNode( tableName, - Array("name", "id", "amount", "price"), + Array("price", "name", "amount"), "'amount > 2"), term("select", "price", "LOWER(name) AS _c1", "amount"), term("where", "<(*(price, 2), 32)") @@ -201,14 +201,10 @@ class TableSourceTest extends TableTestBase { .select('price, 'id, 'amount) .where("amount > 2 && amount < 32") - val expected = unaryNode( - "DataSetCalc", - batchFilterableSourceTableNode( - tableName, - Array("name", "id", "amount", "price"), - "'amount > 2 && 'amount < 32"), - term("select", "price", "id", "amount") - ) + val expected = batchFilterableSourceTableNode( + tableName, + Array("price", "id", "amount"), + "'amount > 2 && 'amount < 32") util.verifyTable(result, expected) } @@ -229,7 +225,7 @@ class TableSourceTest extends TableTestBase { "DataSetCalc", batchFilterableSourceTableNode( tableName, - Array("name", "id", "amount", "price"), + Array("price", "id", "amount"), "'amount > 2"), term("select", "price", "id", "amount"), term("where", "OR(<(amount, 32), >(CAST(amount), 10))") @@ -256,7 +252,7 @@ class TableSourceTest extends TableTestBase { "DataSetCalc", batchFilterableSourceTableNode( tableName, - Array("name", "id", "amount", "price"), + Array("price", "id", "amount"), "'amount > 2"), term("select", "price", "id", "amount"), term("where", s"<(${Func0.getClass.getSimpleName}(amount), 32)") @@ -339,7 +335,7 @@ class TableSourceTest extends TableTestBase { "DataStreamCalc", streamFilterableSourceTableNode( tableName, - Array("name", "id", "amount", "price"), + Array("price", "id", "amount"), "'amount > 2"), term("select", "price", "id", "amount"), term("where", "<(*(price, 2), 32)") @@ -392,11 +388,15 @@ class TableSourceTest extends TableTestBase { } def batchSourceTableNode(sourceName: String, fields: Array[String]): String = { - s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])" + s"BatchTableSourceScan(table=[[$sourceName]], " + + s"fields=[${fields.mkString(", ")}], " + + s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])" } def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = { - s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])" + s"StreamTableSourceScan(table=[[$sourceName]], " + + s"fields=[${fields.mkString(", ")}], " + + s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])" } def batchFilterableSourceTableNode(
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala index be073bd..c53f5ac 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala @@ -20,78 +20,74 @@ package org.apache.flink.table.api.stream.table import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.Types +import org.apache.flink.table.api.{TableSchema, Types} import org.apache.flink.table.api.scala._ -import org.apache.flink.table.sources._ import org.apache.flink.table.utils.TableTestUtil._ -import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.{TableTestBase, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSourceWithTime} import org.apache.flink.types.Row -import org.junit.{Assert, Test} +import org.junit.Test class TableSourceTest extends TableTestBase { @Test def testTableSourceWithLongRowTimeField(): Unit = { - val tableSource = new TestRowtimeSource( + val tableSchema = new TableSchema( Array("id", "rowtime", "val", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)) + val returnType = new RowTypeInfo( Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) .asInstanceOf[Array[TypeInformation[_]]], - "rowtime" - ) + Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.registerTableSource("rowTimeT", tableSource) + util.tableEnv.registerTableSource( + "rowTimeT", + new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime")) val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val") - val expected = - unaryNode( - "DataStreamCalc", - "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])", - term("select", "rowtime", "id", "name", "val") - ) + val expected = "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, id, name, val])" util.verifyTable(t, expected) } @Test def testTableSourceWithTimestampRowTimeField(): Unit = { - val tableSource = new TestRowtimeSource( + val tableSchema = new TableSchema( Array("id", "rowtime", "val", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)) + val returnType = new RowTypeInfo( Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING) .asInstanceOf[Array[TypeInformation[_]]], - "rowtime" - ) + Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.registerTableSource("rowTimeT", tableSource) + util.tableEnv.registerTableSource( + "rowTimeT", + new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime")) val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val") - val expected = - unaryNode( - "DataStreamCalc", - "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])", - term("select", "rowtime", "id", "name", "val") - ) + val expected = "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, id, name, val])" util.verifyTable(t, expected) } @Test def testRowTimeTableSourceGroupWindow(): Unit = { - val tableSource = new TestRowtimeSource( + val tableSchema = new TableSchema( Array("id", "rowtime", "val", "name"), - Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING) .asInstanceOf[Array[TypeInformation[_]]], - "rowtime" - ) + Array("id", "rowtime", "val", "name")) val util = streamTestUtil() - util.tableEnv.registerTableSource("rowTimeT", tableSource) + util.tableEnv.registerTableSource( + "rowTimeT", + new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime")) val t = util.tableEnv.scan("rowTimeT") .filter("val > 100") @@ -106,8 +102,8 @@ class TableSourceTest extends TableTestBase { "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", - "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])", - term("select", "name", "val", "rowtime"), + "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, val, name])", + term("select", "rowtime", "val", "name"), term("where", ">(val, 100)") ), term("groupBy", "name"), @@ -121,27 +117,47 @@ class TableSourceTest extends TableTestBase { @Test def testProcTimeTableSourceSimple(): Unit = { + + val tableSchema = new TableSchema( + Array("id", "proctime", "val", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "val", "name")) + val util = streamTestUtil() - util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime")) + util.tableEnv.registerTableSource( + "procTimeT", + new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime")) - val t = util.tableEnv.scan("procTimeT").select("pTime, id, name, val") + val t = util.tableEnv.scan("procTimeT").select("proctime, id, name, val") val expected = unaryNode( "DataStreamCalc", - "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])", - term("select", "PROCTIME(pTime) AS pTime", "id", "name", "val") + "StreamTableSourceScan(table=[[procTimeT]], fields=[id, proctime, val, name])", + term("select", "PROCTIME(proctime) AS proctime", "id", "name", "val") ) util.verifyTable(t, expected) } @Test def testProcTimeTableSourceOverWindow(): Unit = { + + val tableSchema = new TableSchema( + Array("id", "proctime", "val", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "val", "name")) + val util = streamTestUtil() - util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime")) + util.tableEnv.registerTableSource( + "procTimeT", + new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime")) val t = util.tableEnv.scan("procTimeT") - .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w) + .window(Over partitionBy 'id orderBy 'proctime preceding 2.hours as 'w) .select('id, 'name, 'val.sum over 'w as 'valSum) .filter('valSum > 100) @@ -150,11 +166,11 @@ class TableSourceTest extends TableTestBase { "DataStreamCalc", unaryNode( "DataStreamOverAggregate", - "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])", + "StreamTableSourceScan(table=[[procTimeT]], fields=[id, proctime, val, name])", term("partitionBy", "id"), - term("orderBy", "pTime"), + term("orderBy", "proctime"), term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), - term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0") + term("select", "id", "proctime", "val", "name", "SUM(val) AS w0$o0") ), term("select", "id", "name", "w0$o0 AS valSum"), term("where", ">(w0$o0, 100)") @@ -163,94 +179,195 @@ class TableSourceTest extends TableTestBase { } @Test - def testProjectableProcTimeTableSource(): Unit = { - // ensures that projection is not pushed into table source with proctime indicators - val util = streamTestUtil() + def testProjectWithRowtimeProctime(): Unit = { + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) - val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] { - override def projectFields(fields: Array[Int]): TableSource[Row] = { - // ensure this method is not called! - Assert.fail() - null.asInstanceOf[TableSource[Row]] - } - } - util.tableEnv.registerTableSource("PTimeTable", projectableTableSource) + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) - val t = util.tableEnv.scan("PTimeTable") - .select('name, 'val) - .where('val > 10) + val t = util.tableEnv.scan("T").select('name, 'val, 'id) - val expected = - unaryNode( - "DataStreamCalc", - "StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])", - term("select", "name", "val"), - term("where", ">(val, 10)") - ) + val expected = "StreamTableSourceScan(table=[[T]], " + + "fields=[name, val, id], " + + "source=[TestSource(physical fields: name, val, id)])" util.verifyTable(t, expected) } @Test - def testProjectableRowTimeTableSource(): Unit = { - // ensures that projection is not pushed into table source with rowtime indicators + def testProjectWithoutRowtime(): Unit = { + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) + + val t = util.tableEnv.scan("T").select('ptime, 'name, 'val, 'id) + + val expected = unaryNode( + "DataStreamCalc", + "StreamTableSourceScan(table=[[T]], " + + "fields=[ptime, name, val, id], " + + "source=[TestSource(physical fields: name, val, id)])", + term("select", "PROCTIME(ptime) AS ptime", "name", "val", "id") + ) + util.verifyTable(t, expected) + } - val projectableTableSource = new TestRowtimeSource( - Array("id", "rowtime", "val", "name"), - Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) - .asInstanceOf[Array[TypeInformation[_]]], - "rowtime") with ProjectableTableSource[Row] { + def testProjectWithoutProctime(): Unit = { + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) - override def projectFields(fields: Array[Int]): TableSource[Row] = { - // ensure this method is not called! - Assert.fail() - null.asInstanceOf[TableSource[Row]] - } - } - util.tableEnv.registerTableSource("RTimeTable", projectableTableSource) + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) - val t = util.tableEnv.scan("RTimeTable") - .select('name, 'val) - .where('val > 10) + val t = util.tableEnv.scan("T").select('name, 'val, 'rtime, 'id) - val expected = - unaryNode( - "DataStreamCalc", - "StreamTableSourceScan(table=[[RTimeTable]], fields=[id, rowtime, val, name])", - term("select", "name", "val"), - term("where", ">(val, 10)") - ) + val expected = "StreamTableSourceScan(table=[[T]], " + + "fields=[name, val, rtime, id], " + + "source=[TestSource(physical fields: name, val, rtime, id)])" util.verifyTable(t, expected) } -} -class TestRowtimeSource( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]], - rowtimeField: String) - extends StreamTableSource[Row] with DefinedRowtimeAttribute { + def testProjectOnlyProctime(): Unit = { + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ??? + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) - override def getRowtimeAttribute: String = rowtimeField + val t = util.tableEnv.scan("T").select('ptime) - override def getReturnType: TypeInformation[Row] = { - new RowTypeInfo(fieldTypes, fieldNames) + val expected = "StreamTableSourceScan(table=[[T]], " + + "fields=[ptime], " + + "source=[TestSource(physical fields: )])" + util.verifyTable(t, expected) } -} -class TestProctimeSource(timeField: String) - extends StreamTableSource[Row] with DefinedProctimeAttribute { + def testProjectOnlyRowtime(): Unit = { + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ??? + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime")) - override def getProctimeAttribute: String = timeField + val t = util.tableEnv.scan("T").select('rtime) - override def getReturnType: TypeInformation[Row] = { - new RowTypeInfo( - Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], - Array("id", "val", "name")) + val expected = "StreamTableSourceScan(table=[[T]], " + + "fields=[rtime], " + + "source=[TestSource(physical fields: rtime)])" + util.verifyTable(t, expected) } -} + @Test + def testProjectWithMapping(): Unit = { + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.LONG, Types.INT, Types.STRING, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("p-rtime", "p-id", "p-name", "p-val")) + val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name") + + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime", mapping)) + + val t = util.tableEnv.scan("T").select('name, 'rtime, 'val) + val expected = "StreamTableSourceScan(table=[[T]], " + + "fields=[name, rtime, val], " + + "source=[TestSource(physical fields: remapped-p-name, remapped-p-rtime, remapped-p-val)])" + util.verifyTable(t, expected) + } + + @Test + def testNestedProject(): Unit = { + + val nested1 = new RowTypeInfo( + Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + Array("name", "value") + ) + + val nested2 = new RowTypeInfo( + Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]], + Array("num", "flag") + ) + + val deepNested = new RowTypeInfo( + Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]], + Array("nested1", "nested2") + ) + + val tableSchema = new TableSchema( + Array("id", "deepNested", "nested", "name"), + Array(Types.INT, deepNested, nested1, Types.STRING)) + + val returnType = new RowTypeInfo( + Array(Types.INT, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "deepNested", "nested", "name")) + + val util = streamTestUtil() + util.tableEnv.registerTableSource( + "T", + new TestNestedProjectableTableSource(tableSchema, returnType, Seq())) + + val t = util.tableEnv + .scan("T") + .select('id, + 'deepNested.get("nested1").get("name") as 'nestedName, + 'nested.get("value") as 'nestedValue, + 'deepNested.get("nested2").get("flag") as 'nestedFlag, + 'deepNested.get("nested2").get("num") as 'nestedNum) + + val expected = unaryNode( + "DataStreamCalc", + "StreamTableSourceScan(table=[[T]], " + + "fields=[id, deepNested, nested], " + + "source=[TestSource(read nested fields: " + + "id.*, deepNested.nested2.num, deepNested.nested2.flag, " + + "deepNested.nested1.name, nested.value)])", + term("select", "id", "deepNested.nested1.name AS nestedName", "nested.value AS nestedValue", + "deepNested.nested2.flag AS nestedFlag", "deepNested.nested2.num AS nestedNum") + ) + util.verifyTable(t, expected) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala deleted file mode 100644 index 80f1725..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.stream.table.validation - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{TableException, Types} -import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.stream.table.{TestProctimeSource, TestRowtimeSource} -import org.apache.flink.table.utils.TableTestBase -import org.junit.Test - -class TableSourceValidationTest extends TableTestBase { - - @Test(expected = classOf[TableException]) - def testRowtimeTableSourceWithEmptyName(): Unit = { - - val tableSource = new TestRowtimeSource( - Array("id", "rowtime", "val", "name"), - Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) - .asInstanceOf[Array[TypeInformation[_]]], - "rowtime" - ) - - val util = streamTestUtil() - util.tableEnv.registerTableSource("rowTime", tableSource) - - val t = util.tableEnv.scan("rowTimeT") - .select('id) - - util.tableEnv.optimize(t.getRelNode, updatesAsRetraction = false) - } - - @Test(expected = classOf[TableException]) - def testProctimeTableSourceWithEmptyName(): Unit = { - val util = streamTestUtil() - util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource(" ")) - - val t = util.tableEnv.scan("procTimeT") - .select('id) - - util.tableEnv.optimize(t.getRelNode, updatesAsRetraction = false) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala deleted file mode 100644 index a845f5c..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api.validation - -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableException -import org.apache.flink.table.api.scala._ -import org.apache.flink.table.utils.TableTestBase -import org.junit.Test - -class FlinkTableValidationTest extends TableTestBase { - - @Test - def testFieldNamesDuplicate() { - - thrown.expect(classOf[TableException]) - thrown.expectMessage("Field names must be unique.\n" + - "List of duplicate fields: [a].\n" + - "List of all fields: [a, a, b].") - - val util = batchTestUtil() - util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala new file mode 100644 index 0000000..92cae5c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +class InlineTableValidationTest extends TableTestBase { + + @Test + def testFieldNamesDuplicate() { + + thrown.expect(classOf[TableException]) + thrown.expectMessage("Field names must be unique.\n" + + "List of duplicate fields: [a].\n" + + "List of all fields: [a, a, b].") + + val util = batchTestUtil() + util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala index 09a9c55..4828e86 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala @@ -18,124 +18,228 @@ package org.apache.flink.table.api.validation +import java.util +import java.util.Collections + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.{TableEnvironment, TableException, Types} -import org.apache.flink.table.sources.CsvTableSource +import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types, ValidationException} +import org.apache.flink.table.sources._ import org.apache.flink.table.utils.TestTableSourceWithTime import org.apache.flink.types.Row import org.junit.Test class TableSourceValidationTest { - @Test(expected = classOf[IllegalArgumentException]) - def testCsvTableSourceBuilderWithNullPath(): Unit = { - CsvTableSource.builder() - .field("myfield", Types.STRING) - // should fail, path is not defined - .build() - } + @Test(expected = classOf[ValidationException]) + def testUnresolvedSchemaField(): Unit = { - @Test(expected = classOf[IllegalArgumentException]) - def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = { - CsvTableSource.builder() - .path("/path/to/csv") - .field("myfield", Types.STRING) - // should fail, field name must no be duplicate - .field("myfield", Types.INT) - } + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) - @Test(expected = classOf[IllegalArgumentException]) - def testCsvTableSourceBuilderWithEmptyField(): Unit = { - CsvTableSource.builder() - .path("/path/to/csv") - // should fail, field can be empty - .build() + val schema = new TableSchema( + Array("id", "name", "amount", "value"), + Array(Types.LONG, Types.STRING, Types.INT, Types.DOUBLE)) + val rowType = new RowTypeInfo( + Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "amount")) + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row]()) + + // should fail because schema field "value" cannot be resolved in result type + tEnv.registerTableSource("testTable", ts) } - @Test(expected = classOf[TableException]) - def testNonExistingRowtimeField(): Unit = { + @Test(expected = classOf[ValidationException]) + def testNonMatchingFieldTypes(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val schema = new TableSchema( + Array("id", "name", "amount"), + Array(Types.LONG, Types.INT, Types.INT)) val rowType = new RowTypeInfo( Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("id", "name", "amount") - ) - val ts = new TestTableSourceWithTime( - Seq[Row](), - rowType, - "rTime", - null - ) + Array("id", "name", "amount")) + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row]()) + + // should fail because types of "name" fields are different + tEnv.registerTableSource("testTable", ts) + } + + @Test(expected = classOf[ValidationException]) + def testMappingToUnknownField(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) - // should fail because configured rowtime field is not in schema + val schema = new TableSchema( + Array("id", "name", "amount"), + Array(Types.LONG, Types.STRING, Types.DOUBLE)) + val rowType = new RowTypeInfo(Types.LONG, Types.STRING, Types.DOUBLE) + val mapping = Map("id" -> "f3", "name" -> "f1", "amount" -> "f2") + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), mapping = mapping) + + // should fail because mapping maps field "id" to unknown field tEnv.registerTableSource("testTable", ts) } - @Test(expected = classOf[TableException]) - def testInvalidTypeRowtimeField(): Unit = { - val rowType = new RowTypeInfo( - Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("id", "name", "amount") - ) - val ts = new TestTableSourceWithTime( - Seq[Row](), - rowType, - "name", - null - ) + @Test(expected = classOf[ValidationException]) + def testMappingWithInvalidFieldType(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) - // should fail because configured rowtime field is not of type Long or Timestamp + val schema = new TableSchema( + Array("id", "name", "amount"), + Array(Types.LONG, Types.STRING, Types.DOUBLE)) + val rowType = new RowTypeInfo(Types.LONG, Types.STRING, Types.INT) + val mapping = Map("id" -> "f0", "name" -> "f1", "amount" -> "f2") + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), mapping = mapping) + + // should fail because mapping maps fields with different types tEnv.registerTableSource("testTable", ts) } - @Test(expected = classOf[TableException]) - def testEmptyRowtimeField(): Unit = { + @Test(expected = classOf[ValidationException]) + def testNonTimestampProctimeField(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val schema = new TableSchema( + Array("id", "name", "amount", "ptime"), + Array(Types.LONG, Types.STRING, Types.INT, Types.LONG)) val rowType = new RowTypeInfo( Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("id", "name", "amount") - ) - val ts = new TestTableSourceWithTime( - Seq[Row](), - rowType, - "", - null - ) + Array("id", "name", "amount")) + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), proctime = "ptime") + + // should fail because processing time field has invalid type + tEnv.registerTableSource("testTable", ts) + } + + @Test(expected = classOf[ValidationException]) + def testNonTimestampRowtimeField(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) - // should fail because configured rowtime field is empty + val schema = new TableSchema( + Array("id", "name", "amount", "rtime"), + Array(Types.LONG, Types.STRING, Types.INT, Types.LONG)) + val rowType = new RowTypeInfo( + Array(Types.LONG, Types.STRING, Types.LONG, Types.INT) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "rtime", "amount")) + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "rtime") + + // should fail because rowtime field has invalid type tEnv.registerTableSource("testTable", ts) } - @Test(expected = classOf[TableException]) - def testEmptyProctimeField(): Unit = { + @Test(expected = classOf[ValidationException]) + def testFieldRowtimeAndProctime(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val schema = new TableSchema( + Array("id", "name", "amount", "time"), + Array(Types.LONG, Types.STRING, Types.INT, Types.SQL_TIMESTAMP)) val rowType = new RowTypeInfo( - Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("id", "name", "amount") - ) - val ts = new TestTableSourceWithTime( - Seq[Row](), - rowType, - null, - "" - ) + Array(Types.LONG, Types.STRING, Types.LONG, Types.INT) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "time", "amount")) + val ts = + new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "time", proctime = "time") + + // should fail because rowtime field has invalid type + tEnv.registerTableSource("testTable", ts) + } + + @Test(expected = classOf[ValidationException]) + def testUnknownTimestampExtractorArgField(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val schema = new TableSchema( + Array("id", "name", "amount", "rtime"), + Array(Types.LONG, Types.STRING, Types.INT, Types.SQL_TIMESTAMP)) + val rowType = new RowTypeInfo( + Array(Types.LONG, Types.STRING, Types.LONG, Types.INT) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "rtime", "amount")) + val ts = + new TestTableSourceWithTime(schema, rowType, Seq[Row]()) { + + override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { + Collections.singletonList(new RowtimeAttributeDescriptor( + "rtime", + new ExistingField("doesNotExist"), + new AscendingWatermarks)) + } + } + + // should fail because timestamp extractor argument field does not exist + tEnv.registerTableSource("testTable", ts) + } + + @Test(expected = classOf[ValidationException]) + def testFailingTimestampExtractorValidation(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) - // should fail because configured proctime field is empty + val fieldNames = Array("id", "name", "amount") + val rowType = new RowTypeInfo( + Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + val schema = new TableSchema( + fieldNames, + Array(Types.LONG, Types.SQL_TIMESTAMP, Types.INT)) + val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "name") + + // should fail because configured rowtime field is not of type Long or Timestamp tEnv.registerTableSource("testTable", ts) } + + // CsvTableSource Tests + + @Test(expected = classOf[IllegalArgumentException]) + def testCsvTableSourceBuilderWithNullPath(): Unit = { + CsvTableSource.builder() + .field("myfield", Types.STRING) + // should fail, path is not defined + .build() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = { + CsvTableSource.builder() + .path("/path/to/csv") + .field("myfield", Types.STRING) + // should fail, field name must no be duplicate + .field("myfield", Types.INT) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testCsvTableSourceBuilderWithEmptyField(): Unit = { + CsvTableSource.builder() + .path("/path/to/csv") + // should fail, field can be empty + .build() + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala index 82bfd8d..0744de9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{TableSchema, Types} -import org.apache.flink.table.plan.schema.StreamTableSourceTable +import org.apache.flink.table.plan.schema.{StreamTableSourceTable} import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.types.Row import org.junit.Assert.assertTrue @@ -58,6 +58,9 @@ class MockTableSourceConverter extends TableSourceConverter[StreamTableSource[Ro val schema = externalCatalogTable.schema Types.ROW(schema.getColumnNames, schema.getTypes) } + + override def getTableSchema: TableSchema = externalCatalogTable.schema + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala index 6d1d66f..54d1510 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.catalog -import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.table.api._ import org.junit.{Before, Test} import org.junit.Assert._ @@ -68,7 +68,7 @@ class InMemoryExternalCatalogTest { val table = createTableInstance() catalog.createTable(tableName, table, ignoreIfExists = false) assertEquals(catalog.getTable(tableName), table) - val newTable = createTableInstance() + val newTable = createTableInstance(Array("number"), Array(Types.INT)) catalog.alterTable(tableName, newTable, ignoreIfNotExists = false) val currentTable = catalog.getTable(tableName) // validate the table is really replaced after alter table @@ -142,4 +142,11 @@ class InMemoryExternalCatalogTest { ) ExternalCatalogTable("csv", schema) } + + private def createTableInstance( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): ExternalCatalogTable = { + val schema = new TableSchema(fieldNames, fieldTypes) + ExternalCatalogTable("csv", schema) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala index 5e214b1..2292e17 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala @@ -18,13 +18,22 @@ package org.apache.flink.table.runtime.batch.table +import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} + +import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.{ExecutionEnvironment => JExecEnv} +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase} import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.table.utils.TestFilterableTableSource +import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.table.utils._ import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -36,6 +45,33 @@ class TableSourceITCase( configMode: TableConfigMode) extends TableProgramsCollectionTestBase(configMode) { + @Test(expected = classOf[TableException]) + def testInvalidDatastreamType(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val tableSource = new BatchTableSource[Row]() { + private val fieldNames: Array[String] = Array("name", "id", "value") + private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT) + .asInstanceOf[Array[TypeInformation[_]]] + + override def getDataSet(execEnv: JExecEnv): DataSet[Row] = { + val data = List(Row.of("Mary", new JLong(1L), new JInt(1))).asJava + // return DataSet[Row] with GenericTypeInfo + execEnv.fromCollection(data, new GenericTypeInfo[Row](classOf[Row])) + } + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes) + } + tEnv.registerTableSource("T", tableSource) + + tEnv.scan("T") + .select('value, 'name) + .collect() + + // test should fail because type info of returned DataSet does not match type return type info. + } + @Test def testCsvTableSourceWithProjection(): Unit = { val csvTable = CommonTestData.getCsvTableSource @@ -76,4 +112,535 @@ class TableSourceITCase( "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n") TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testRowtimeRowTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of("Mary", new JLong(1L), new JInt(10)), + Row.of("Bob", new JLong(2L), new JInt(20)), + Row.of("Mary", new JLong(2L), new JInt(30)), + Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)) + val rowType = new RowTypeInfo( + Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + + val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('name, 'w) + .select('name, 'w.start, 'amount.sum) + .collect() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.0,40", + "Bob,1970-01-01 00:00:00.0,20", + "Liz,1970-01-01 00:00:02.0,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testProctimeRowTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of("Mary", new JLong(1L), new JInt(10)), + Row.of("Bob", new JLong(2L), new JInt(20)), + Row.of("Mary", new JLong(2L), new JInt(30)), + Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema( + fieldNames :+ "ptime", + Array(Types.STRING, Types.LONG, Types.INT, Types.SQL_TIMESTAMP)) + val rowType = new RowTypeInfo( + Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + + val tableSource = new TestTableSourceWithTime(schema, rowType, data, null, "ptime") + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .where('ptime.cast(Types.LONG) > 0L) + .select('name, 'amount) + .collect() + + val expected = Seq( + "Mary,10", + "Bob,20", + "Mary,30", + "Liz,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testRowtimeProctimeRowTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of("Mary", new JLong(1L), new JInt(10)), + Row.of("Bob", new JLong(2L), new JInt(20)), + Row.of("Mary", new JLong(2L), new JInt(30)), + Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema( + fieldNames :+ "ptime", + Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT, Types.SQL_TIMESTAMP)) + val rowType = new RowTypeInfo( + Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + + val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", "ptime") + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('name, 'w) + .select('name, 'w.start, 'amount.sum) + .collect() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.0,40", + "Bob,1970-01-01 00:00:00.0,20", + "Liz,1970-01-01 00:00:02.0,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testRowtimeAsTimestampRowTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of("Mary", toTimestamp(1L), new JInt(10)), + Row.of("Bob", toTimestamp(2L), new JInt(20)), + Row.of("Mary", toTimestamp(2L), new JInt(30)), + Row.of("Liz", toTimestamp(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)) + val rowType = new RowTypeInfo( + Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + + val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('name, 'w) + .select('name, 'w.start, 'amount.sum) + .collect() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.0,40", + "Bob,1970-01-01 00:00:00.0,20", + "Liz,1970-01-01 00:00:02.0,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testRowtimeLongTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L)) + + val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP)) + val returnType = Types.LONG + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('w) + .select('w.start, 'rtime.count) + .collect() + + val expected = Seq( + "1970-01-01 00:00:00.0,3", + "1970-01-01 00:00:02.0,1", + "1970-01-01 00:00:04.0,1").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testProctimeStringTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq("Mary", "Peter", "Bob", "Liz") + + val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP)) + val returnType = Types.STRING + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime") + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .where('ptime.cast(Types.LONG) > 1) + .select('name) + .collect() + + val expected = Seq("Mary", "Peter", "Bob", "Liz").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testRowtimeProctimeLongTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L)) + + val schema = new TableSchema( + Array("rtime", "ptime"), + Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)) + val returnType = Types.LONG + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime") + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .where('ptime.cast(Types.LONG) > 1) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('w) + .select('w.start, 'rtime.count) + .collect() + + val expected = Seq( + "1970-01-01 00:00:00.0,3", + "1970-01-01 00:00:02.0,1", + "1970-01-01 00:00:04.0,1").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFieldMappingTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of("Mary", new JLong(1L), new JInt(10)), + Row.of("Bob", new JLong(2L), new JInt(20)), + Row.of("Mary", new JLong(2L), new JInt(30)), + Row.of("Liz", new JLong(2001L), new JInt(40))) + + val schema = new TableSchema( + Array("ptime", "amount", "name", "rtime"), + Array(Types.SQL_TIMESTAMP, Types.INT, Types.STRING, Types.SQL_TIMESTAMP)) + val returnType = new RowTypeInfo(Types.STRING, Types.LONG, Types.INT) + val mapping = Map("amount" -> "f2", "name" -> "f0", "rtime" -> "f1") + + val source = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime", mapping) + tEnv.registerTableSource(tableName, source) + + val results = tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('name, 'w) + .select('name, 'w.start, 'amount.sum) + .collect() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.0,40", + "Bob,1970-01-01 00:00:00.0,20", + "Liz,1970-01-01 00:00:02.0,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testProjectWithoutRowtimeProctime(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)), + Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)), + Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)), + Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + val results = tEnv.scan("T") + .select('name, 'val, 'id) + .collect() + + val expected = Seq( + "Mary,10,1", + "Bob,20,2", + "Mike,30,3", + "Liz,40,4").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testProjectWithoutProctime(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)), + Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)), + Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)), + Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + val results = tEnv.scan("T") + .select('rtime, 'name, 'id) + .collect() + + val expected = Seq( + "1970-01-01 00:00:00.001,Mary,1", + "1970-01-01 00:00:00.002,Bob,2", + "1970-01-01 00:00:00.002,Mike,3", + "1970-01-01 00:00:02.001,Liz,4").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testProjectWithoutRowtime(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)), + Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)), + Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)), + Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + val results = tEnv.scan("T") + .filter('ptime.cast(Types.LONG) > 0) + .select('name, 'id) + .collect() + + val expected = Seq( + "Mary,1", + "Bob,2", + "Mike,3", + "Liz,4").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + def testProjectOnlyProctime(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"), + Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"), + Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"), + Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz")) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + val results = tEnv.scan("T") + .select('ptime > 0) + .select(1.count) + .collect() + + val expected = Seq("4").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + def testProjectOnlyRowtime(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"), + Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"), + Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"), + Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz")) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + val results = tEnv.scan("T") + .select('rtime) + .collect() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:02.001").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testProjectWithMapping(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JLong(1), new JInt(1), "Mary", new JLong(10)), + Row.of(new JLong(2), new JInt(2), "Bob", new JLong(20)), + Row.of(new JLong(2), new JInt(3), "Mike", new JLong(30)), + Row.of(new JLong(2001), new JInt(4), "Liz", new JLong(40))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.LONG, Types.INT, Types.STRING, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("p-rtime", "p-id", "p-name", "p-val")) + val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name") + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime", mapping)) + + val results = tEnv.scan("T") + .select('name, 'rtime, 'val) + .collect() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.001,10", + "Bob,1970-01-01 00:00:00.002,20", + "Mike,1970-01-01 00:00:00.002,30", + "Liz,1970-01-01 00:00:02.001,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNestedProject(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JLong(1), + Row.of( + Row.of("Sarah", new JInt(100)), + Row.of(new JInt(1000), new JBool(true)) + ), + Row.of("Peter", new JInt(10000)), + "Mary"), + Row.of(new JLong(2), + Row.of( + Row.of("Rob", new JInt(200)), + Row.of(new JInt(2000), new JBool(false)) + ), + Row.of("Lucy", new JInt(20000)), + "Bob"), + Row.of(new JLong(3), + Row.of( + Row.of("Mike", new JInt(300)), + Row.of(new JInt(3000), new JBool(true)) + ), + Row.of("Betty", new JInt(30000)), + "Liz")) + + val nested1 = new RowTypeInfo( + Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + Array("name", "value") + ) + val nested2 = new RowTypeInfo( + Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]], + Array("num", "flag") + ) + val deepNested = new RowTypeInfo( + Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]], + Array("nested1", "nested2") + ) + val tableSchema = new TableSchema( + Array("id", "deepNested", "nested", "name"), + Array(Types.LONG, deepNested, nested1, Types.STRING)) + + val returnType = new RowTypeInfo( + Array(Types.LONG, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "deepNested", "nested", "name")) + + tEnv.registerTableSource( + "T", + new TestNestedProjectableTableSource(tableSchema, returnType, data)) + + val results = tEnv + .scan("T") + .select('id, + 'deepNested.get("nested1").get("name") as 'nestedName, + 'nested.get("value") as 'nestedValue, + 'deepNested.get("nested2").get("flag") as 'nestedFlag, + 'deepNested.get("nested2").get("num") as 'nestedNum) + .collect() + + val expected = Seq( + "1,Sarah,10000,true,1000", + "2,Rob,20000,false,2000", + "3,Mike,30000,true,3000").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index e672335..b7f97f9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -32,7 +32,7 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1 import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc -import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types} import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit} import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo} import org.apache.flink.table.runtime.utils.StreamITCase @@ -494,6 +494,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { @Test def testTableSourceWithTimeIndicators(): Unit = { + StreamITCase.clear val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) @@ -505,15 +506,17 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { Row.of(new JInt(4), "D", new JLong(4000L)), Row.of(new JInt(5), "E", new JLong(5000L)), Row.of(new JInt(6), "F", new JLong(6000L))) + + val fieldNames = Array("a", "b", "rowtime") + val schema = new TableSchema( + fieldNames :+ "proctime", + Array(Types.INT, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)) val rowType = new RowTypeInfo( Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]], - Array("a", "b", "rowtime") - ) + fieldNames) - tEnv.registerTableSource( - "testTable", - new TestTableSourceWithTime(rows, rowType, "rowtime", "proctime")) - StreamITCase.clear + val tableSource = new TestTableSourceWithTime(schema, rowType, rows, "rowtime", "proctime") + tEnv.registerTableSource("testTable", tableSource) val result = tEnv .scan("testTable")
