Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6201#discussion_r198215161
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
---
@@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase {
"1,1,Hi,1970-01-01 00:00:00.001",
"2,2,Hello,1970-01-01 00:00:00.002",
"3,2,Hello world,1970-01-01 00:00:00.002")
- assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ assertEquals(expected.sorted,
MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted)
+ }
+
+ @Test
+ def testWriteReadTableSourceSink(): Unit = {
+ var env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ var tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSourceSinkUtil.clear
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("a", "e", "f", "t")
+ val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING,
Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]]
+
+ val tableSchema = new TableSchema(
+ Array("a", "e", "f", "t", "rowtime", "proctime"),
+ Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP,
+ Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("a", "e", "f", "t"))
+ tEnv.registerTableSource("targetTable", new
MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+ tableSchema, returnType, "rowtime", 3))
+ tEnv.registerTableSink("targetTable",
+ new
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames,
fieldTypes))
+
+ tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM
sourceTable")
+ tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable")
--- End diff --
I think we need more test cases about how we handle the time attributes for
`both` table types. Maybe not only ITCases but also unit tests. The `configure`
method is an internal method that should not be called here.
---