Hi everybody,
  I'd like to propose and discuss some api changes to support DML
operations like ‘insert into’ clause in TableAPI&SQL.
 Originally this was discussed with Fabian in the PR conversations(see
https://github.com/apache/flink/pull/3829),  considering it makes several
api changes, so starting this mailing list to discuss it.
# Motivation

Currently in TableAPI  there’s only registration method for source table,
 when we use SQL writing a streaming job, we should add additional code for
the sink, like TableAPI does:

val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"

val t = StreamTestData.getSmall3TupleDataStream(env)

tEnv.registerDataStream("MyTable", t)

// one way: invoke tableAPI’s writeToSink method directly

val result = tEnv.sql(sqlQuery)

result.writeToSink(new YourStreamSink)

// another way: convert to datastream first and then invoke addSink

val result = tEnv.sql(sqlQuery).toDataStream[Row]

result.addSink(new StreamITCase.StringSink)

>From the api we can see the sink table always be a derived table because
its 'schema' is inferred from the result type of upstream query.

Compare to traditional RDBMS which support DML syntax, a query with a
target output could be written like this:

insert into table target_table_name

[(column_name [ ,...n ])]

query

The equivalent form of the example above is as follows:

   tEnv.registerTableSink("targetTable", new YourSink)

   val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"

   val result = tEnv.sql(sql)

It is supported by Calcite’s grammar:

insert:( INSERT | UPSERT ) INTO tablePrimary

[ '(' column [, column ]* ')' ]

query

I'd like to extend Flink TableAPI to support such feature.
# Proposed changes

1. support registering a sink table (like source table registration, and
will do validation according to the registered table)

/**

 * Registers an external [[TableSink]] in this [[TableEnvironment]]'s
catalog.

 * Registered sink tables can be referenced in SQL DML clause.

 *

 * @param name The name under which the [[TableSink]] is registered.

 * @param tableSink The [[TableSink]] to register.

 */

def registerTableSink(name: String, tableSink: TableSink[_]): Unit


2. add two new methods to table.scala

   -

   def insertInto[T](tableSink: String): Unit
   -

   def insertInto[T](tableSink: String, conf: QueryConfig): Unit

I propose to retain the current writeToSink method so that will not do a
breaking change of the API. And in a sense, it is similar with ‘SQL CREATE
TABLE AS statement’ usage in RDBMS(which creates a table from an existing
table by copying the existing table's columns).

3. deprecate the current sql method and add two new methods to
TableEnvironment

   -

   @deprecated def sql(sql: String): Table
   -

   def sqlQuery(sql: String): Table
   -

   def sqlUpdate(sql: String, config: QueryConfig): Unit

I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate
which returns a int value, because sqlUpdate will not trigger an execution
immediately, so keep the return value as Unit sounds reasonable and doesn't
break down the consistency of Scala and Java APIs.

Note that:

A registered source table can not be update unless it registered as a sink
table as well. So we need to add validation both in TableAPI and SQL for
preventing query on sink table or insert into a source table.

Do not support partial column insertion to a target table due to it hadn’t
nullable property definition for now.

Ref:
[1]
https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html


doc link: https://goo.gl/n3phK5

What do you think?

Best, Lincoln

Reply via email to