[ 
https://issues.apache.org/jira/browse/FLINK-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16889544#comment-16889544
 ] 

chenqi commented on FLINK-13289:
--------------------------------

The flink table planner will extract the keyfields from the upsert sql planner

in StreamPlanner::writeToUpsertSink():

 
{code:java}
 
...
// extract unique key fields
val tableKeys: Option[Array[String]] = 
UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan)
...
{code}
But the blink planner doesn't do this.

One workaround is setting key fields manually.

JDBCUpsertTableSinkITCase::testUpsert():
{code:java}
String[] fields = {"cnt", "lencnt", "cTag"};
tEnv.registerTableSink("upsertSink", JDBCUpsertTableSink.builder()
 .setOptions(JDBCOptions.builder()
 .setDBUrl(DB_URL)
 .setTableName(OUTPUT_TABLE1)
 .build())
 .setTableSchema(TableSchema.builder().fields(
 fields, new DataType[] {BIGINT(), BIGINT(), INT()}).build())
 .setKeyFields(new String[]{"cnt", "cTag"})
 .build());{code}
 

 

> Blink Planner JDBCUpsertTableSink : UnsupportedOperationException 
> "JDBCUpsertTableSink can not support "
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-13289
>                 URL: https://issues.apache.org/jira/browse/FLINK-13289
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0
>            Reporter: LakeShen
>            Priority: Major
>
> Hi , in flink-jdbc connector module, I change the Flink planner to Blink 
> planner to test all test case,because we want to use Blank planner in our 
> program. When I test the JDBCUpsertTableSinkITCase class , the method 
> testUpsert throw the exception:
> {color:red}java.lang.UnsupportedOperationException: JDBCUpsertTableSink can 
> not support {color}
> I saw the src code,in Flink planner , the StreamPlanner set the 
> JDBCUpsertTableSink' keyFields,
> but in Blink planner , I didn't find anywhere to set JDBCUpsertTableSink' 
> keyFields,so JDBCUpsertTableSink keyFields is null, when execute 
> JDBCUpsertTableSink newFormat(),
> it thrown the exception.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to