[ 
https://issues.apache.org/jira/browse/FLINK-12337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826885#comment-16826885
 ] 

Artsem Semianenka commented on FLINK-12337:
-------------------------------------------

I will attach the pull request as an illustrative example of my proposal. 

> [TableSQL/Planner] InsertInto method should configure TableSink 
> ----------------------------------------------------------------
>
>                 Key: FLINK-12337
>                 URL: https://issues.apache.org/jira/browse/FLINK-12337
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Artsem Semianenka
>            Assignee: Artsem Semianenka
>            Priority: Major
>
> In the current implementation of TableEnvironment the *insertInto(..)* method 
> check is the source Table Schema exactly the same as sink schema otherwise, 
> it throws a ValidationException.
> Let's imagine the following setup:
>  I have a sink wich produce the Row with fields A and B. (for example Kafka)
>  By the other hand, I have an External Catalog which produce the sink for 
> Database which support UPSERT (for example PostgreSQL or Cloudera Kudu in my 
> case) with schema of fields :
>  * A _required_
>  * B _required_
>  * C _+optional+_
> I want to UPSERT only A and B fields in the sink. But as far as sink was 
> created from the external catalog it knows only the schema of the table. 
> (fields A B C) and when I create the query like this I got the 
> ValidationException.
> {code:java}
> INSERT INTO sink.table
> SELECT A, B FROM src.topic{code}
>  I propose to validate in *insertInto(..)* method does the source schema is 
> the subset of sink schema and if yes call the configure(..) method for the 
> sink. In this case, the sink can adapt for source schema if it is possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to