[ 
https://issues.apache.org/jira/browse/FLINK-22498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334438#comment-17334438
 ] 

Jark Wu commented on FLINK-22498:
---------------------------------

This is a problem on Kudu sink, the Kudu sink implementation should upgrade to 
new {{DynamicTableSink}}. 

> cast the primary key for source table that has a decimal primary key as 
> string, and then insert into a kudu table that has a string primary key throw 
> the exception : UpsertStreamTableSink requires that Table has a full primary 
> keys if it is updated
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22498
>                 URL: https://issues.apache.org/jira/browse/FLINK-22498
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.1
>         Environment: flink 1.12.1
> jdk 1.8
> hive 2.1.1
> kudu 1.10.0
> kafka 2.0.0
>            Reporter: Carl
>            Priority: Critical
>         Attachments: bug.rar
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> *1. source table:*
> CREATE TABLE ddl_source (
>  appl_seq DECIMAL(16,2),
>  name STRING,
>  PRIMARY KEY(appl_seq) NOT ENFORCED
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'ogg-json-03',
>  'properties.bootstrap.servers' = 'xxxx:9092',
>  'value.format' = 'canal-json'
> )
> *2. sink table:*create the table use impala
> create table rt_dwd.test_bug( 
>     pk       string  ,
>     name      string  ,
>     primary key (pk)
> ) partition by hash (pk) partitions 5 stored as kudu 
> TBLPROPERTIES  ('kudu.master_addresses' = 'xxxx:7051');
> *3. execute sql:*use blink planner
> insert into kuducatalog.default_database.`rt_dwd.test_bug`
> select CAST(appl_seq AS STRING), name  from ddl_source
>  
> *throw an exception :*
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> UpsertStreamTableSink requires that Table has a full primary keys if it is 
> updated.Exception in thread "main" org.apache.flink.table.api.TableException: 
> UpsertStreamTableSink requires that Table has a full primary keys if it is 
> updated. at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>  at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:167)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>  
> *case A:if we use source table as follows, it will not throw the exception :*
> CREATE TABLE ddl_source (
>  appl_seq STRING,
>  name STRING,
>  PRIMARY KEY(appl_seq) NOT ENFORCED
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'ogg-json-03',
>  'properties.bootstrap.servers' = 'xxxx:9092',
>  'value.format' = 'canal-json'
> )
>  
> *case B:or we ddl kudu table,and use sql as follows,  it will not throw the 
> exception :*
> _DDL:_
> create table rt_dwd.test_bug( 
>     pk       decimal(16,2),
>     name      string  ,
>     primary key (pk)
> ) partition by hash (pk) partitions 5 stored as kudu 
> TBLPROPERTIES  ('kudu.master_addresses' = 'xxxx:7051');
> _DML:_
> insert into kuducatalog.default_database.`rt_dwd.test_bug`
> select  appl_seq, name  from ddl_source
>  
> *When debugging the source code, it may be related to SQL parsing engine*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to