[
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524011#comment-16524011
]
ASF GitHub Bot commented on FLINK-8866:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6201#discussion_r198215161
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
---
@@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase {
"1,1,Hi,1970-01-01 00:00:00.001",
"2,2,Hello,1970-01-01 00:00:00.002",
"3,2,Hello world,1970-01-01 00:00:00.002")
- assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+ assertEquals(expected.sorted,
MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted)
+ }
+
+ @Test
+ def testWriteReadTableSourceSink(): Unit = {
+ var env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ var tEnv = TableEnvironment.getTableEnvironment(env)
+ MemoryTableSourceSinkUtil.clear
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ .assignAscendingTimestamps(x => x._2)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+ tEnv.registerTable("sourceTable", t)
+
+ val fieldNames = Array("a", "e", "f", "t")
+ val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING,
Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]]
+
+ val tableSchema = new TableSchema(
+ Array("a", "e", "f", "t", "rowtime", "proctime"),
+ Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP,
+ Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+ val returnType = new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+ .asInstanceOf[Array[TypeInformation[_]]],
+ Array("a", "e", "f", "t"))
+ tEnv.registerTableSource("targetTable", new
MemoryTableSourceSinkUtil.UnsafeMemoryTableSource(
+ tableSchema, returnType, "rowtime", 3))
+ tEnv.registerTableSink("targetTable",
+ new
MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames,
fieldTypes))
+
+ tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM
sourceTable")
+ tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable")
--- End diff --
I think we need more test cases about how we handle the time attributes for
`both` table types. Maybe not only ITCases but also unit tests. The `configure`
method is an internal method that should not be called here.
> Create unified interfaces to configure and instatiate TableSinks
> ----------------------------------------------------------------
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Shuyi Chen
> Priority: Major
> Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure
> and instantiate TableSinks. Among other applications, this is necessary in
> order to declare table sinks in an environment file of the SQL client. Such
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind.
> 1) Add TableSinkFactory/TableSinkFactoryService similar to
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both)
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to
> identify whether it's source or sink.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)