Hi everybody, I'd like to propose and discuss some api changes to support DML operations like ‘insert into’ clause in TableAPI&SQL. Originally this was discussed with Fabian in the PR conversations(see https://github.com/apache/flink/pull/3829), considering it makes several api changes, so starting this mailing list to discuss it. # Motivation
Currently in TableAPI there’s only registration method for source table, when we use SQL writing a streaming job, we should add additional code for the sink, like TableAPI does: val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" val t = StreamTestData.getSmall3TupleDataStream(env) tEnv.registerDataStream("MyTable", t) // one way: invoke tableAPI’s writeToSink method directly val result = tEnv.sql(sqlQuery) result.writeToSink(new YourStreamSink) // another way: convert to datastream first and then invoke addSink val result = tEnv.sql(sqlQuery).toDataStream[Row] result.addSink(new StreamITCase.StringSink) >From the api we can see the sink table always be a derived table because its 'schema' is inferred from the result type of upstream query. Compare to traditional RDBMS which support DML syntax, a query with a target output could be written like this: insert into table target_table_name [(column_name [ ,...n ])] query The equivalent form of the example above is as follows: tEnv.registerTableSink("targetTable", new YourSink) val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" val result = tEnv.sql(sql) It is supported by Calcite’s grammar: insert:( INSERT | UPSERT ) INTO tablePrimary [ '(' column [, column ]* ')' ] query I'd like to extend Flink TableAPI to support such feature. # Proposed changes 1. support registering a sink table (like source table registration, and will do validation according to the registered table) /** * Registers an external [[TableSink]] in this [[TableEnvironment]]'s catalog. * Registered sink tables can be referenced in SQL DML clause. * * @param name The name under which the [[TableSink]] is registered. * @param tableSink The [[TableSink]] to register. */ def registerTableSink(name: String, tableSink: TableSink[_]): Unit 2. add two new methods to table.scala - def insertInto[T](tableSink: String): Unit - def insertInto[T](tableSink: String, conf: QueryConfig): Unit I propose to retain the current writeToSink method so that will not do a breaking change of the API. And in a sense, it is similar with ‘SQL CREATE TABLE AS statement’ usage in RDBMS(which creates a table from an existing table by copying the existing table's columns). 3. deprecate the current sql method and add two new methods to TableEnvironment - @deprecated def sql(sql: String): Table - def sqlQuery(sql: String): Table - def sqlUpdate(sql: String, config: QueryConfig): Unit I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate which returns a int value, because sqlUpdate will not trigger an execution immediately, so keep the return value as Unit sounds reasonable and doesn't break down the consistency of Scala and Java APIs. Note that: A registered source table can not be update unless it registered as a sink table as well. So we need to add validation both in TableAPI and SQL for preventing query on sink table or insert into a source table. Do not support partial column insertion to a target table due to it hadn’t nullable property definition for now. Ref: [1] https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html doc link: https://goo.gl/n3phK5 What do you think? Best, Lincoln