Thank you , Jark . I have added the primary key in my flink sql before , and it throwed the * Primary key and unique key are not supported yet. *Now I know it ,thank you sincerely to reply me .
Best wishes, LakeShen Jark Wu <imj...@gmail.com> 于2019年9月12日周四 下午3:15写道: > Hi Lake, > > This is not a problem of HBaseUpsertTableSink. > This is because the query loses primary key (e.g. concat(key1, key2) will > lose the primary key information [key1, key2] currently.), > but the validation of inserting checks the upsert query should have a > primary key. That’s why the exception is thrown. > > IMO, in order to fix this problem, we need to enrich the primary key > inference to support all kinds of built-in function/operators. > But this is a large work which means it may not happen in 1.9.1. > > Regards, > Jark > > On Thu, 12 Sep 2019 at 14:39, LakeShen <shenleifight...@gmail.com> wrote: > > > Hi community , when I create the hbase sink table in my flink ddl sql > > ,just like this: > > > > > > > > > > > > *create table sink_hbase_table( rowkey VARCHAR, cf > > row( kdt_it_count bigint )) with (xxxxxx);* > > > > and I run my flink task , it throws the exception like this : > > *UpsertStreamTableSink requires that Table has a full primary keys if it > is > > updated.* > > at > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115) > > at > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > > at > > > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > > at > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > > at > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) > > at > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > > I saw the flink source code , I find that in HBaseUpsertTableSink , the > > method setKeyFields doesnt' has any code content,in StreamExecSink > class,I > > saw the code content like this : > > *//TODO UpsertStreamTableSink setKeyFields interface should be > > Array[Array[String]]* > > but now the UpsertStreamTableSink setKeyFields interface is > Array[String], > > it seems like the conflict with the above content. > > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply. > > >