Fabian Hueske commented on FLINK-8866:

Regarding (3), originally, {{TableSink}} was designed to be flexible with 
regard to the output schema. A query that should be emitted was planned by the 
optimizer. Based on the resulting type, the TableSink was internally configured 
for the result schema. The configuration method produces a configured copy of 
the sink that is used to emit the result. So, the TableSink was not known to 
Calcite and only handled by the TableEnvironment.

When we added support for {{INSERT INTO}} this didn't work anymore because 
Calcite validates that the schema of the target table is compatible with the 
result schema of the SELECT query. Hence, we added the field names and types to 
the registration, configure the TableSink, and register the newly configured 
TableSink in Calcite's catalog. By doing it this way, we did not have to change 
the interface of the TableSink which did not only mean backwards compatibility 
but also that all TableSinks can be used in either way.

> Create unified interfaces to configure and instatiate TableSinks
> ----------------------------------------------------------------
>                 Key: FLINK-8866
>                 URL: https://issues.apache.org/jira/browse/FLINK-8866
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Shuyi Chen
>            Priority: Major
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.

This message was sent by Atlassian JIRA

Reply via email to