[ 
https://issues.apache.org/jira/browse/FLINK-12337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artsem Semianenka updated FLINK-12337:
--------------------------------------
    Description: 
In the current implementation of TableEnvironment the 
[insertInto(..)|https://github.com/apache/flink/blob/7347c9e2094c5243ef7db34bde1fd87ea5e7641d/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala#L656]
 method check is the source Table Schema exactly the same as sink schema 
otherwise, it throws a ValidationException.

Let's imagine the following setup:
 I have a sink wich produce the Row with fields A and B. (for example Kafka)
 By the other hand, I have an External Catalog which produce the sink for 
Database which support UPSERT (for example PostgreSQL or Cloudera Kudu in my 
case) with schema of fields :
 * A _required_
 * B _required_
 * C _+optional+_

I want to UPSERT only A and B fields in the sink. But as far as sink was 
created from the external catalog it knows only the schema of the table. 
(fields A B C) and when I create the query like this I got the 
ValidationException.
{code:java}
INSERT INTO sink.table
SELECT A, B FROM src.topic{code}
 I propose to validate in *insertInto(..)* method does the source schema is the 
subset of sink schema and if yes call the configure(..) method for the sink. In 
this case, the sink can adapt for source schema if it is possible.

  was:
In the current implementation of TableEnvironment the *insertInto(..)* method 
check is the source Table Schema exactly the same as sink schema otherwise, it 
throws a ValidationException.

Let's imagine the following setup:
 I have a sink wich produce the Row with fields A and B. (for example Kafka)
 By the other hand, I have an External Catalog which produce the sink for 
Database which support UPSERT (for example PostgreSQL or Cloudera Kudu in my 
case) with schema of fields :
 * A _required_
 * B _required_
 * C _+optional+_

I want to UPSERT only A and B fields in the sink. But as far as sink was 
created from the external catalog it knows only the schema of the table. 
(fields A B C) and when I create the query like this I got the 
ValidationException.
{code:java}
INSERT INTO sink.table
SELECT A, B FROM src.topic{code}
 I propose to validate in *insertInto(..)* method does the source schema is the 
subset of sink schema and if yes call the configure(..) method for the sink. In 
this case, the sink can adapt for source schema if it is possible.


> [TableSQL/Planner] InsertInto method should configure TableSink 
> ----------------------------------------------------------------
>
>                 Key: FLINK-12337
>                 URL: https://issues.apache.org/jira/browse/FLINK-12337
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Artsem Semianenka
>            Assignee: Artsem Semianenka
>            Priority: Major
>
> In the current implementation of TableEnvironment the 
> [insertInto(..)|https://github.com/apache/flink/blob/7347c9e2094c5243ef7db34bde1fd87ea5e7641d/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala#L656]
>  method check is the source Table Schema exactly the same as sink schema 
> otherwise, it throws a ValidationException.
> Let's imagine the following setup:
>  I have a sink wich produce the Row with fields A and B. (for example Kafka)
>  By the other hand, I have an External Catalog which produce the sink for 
> Database which support UPSERT (for example PostgreSQL or Cloudera Kudu in my 
> case) with schema of fields :
>  * A _required_
>  * B _required_
>  * C _+optional+_
> I want to UPSERT only A and B fields in the sink. But as far as sink was 
> created from the external catalog it knows only the schema of the table. 
> (fields A B C) and when I create the query like this I got the 
> ValidationException.
> {code:java}
> INSERT INTO sink.table
> SELECT A, B FROM src.topic{code}
>  I propose to validate in *insertInto(..)* method does the source schema is 
> the subset of sink schema and if yes call the configure(..) method for the 
> sink. In this case, the sink can adapt for source schema if it is possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to