Thank you , Jark . I have added  the primary key in my flink sql before ,
and it throwed  the * Primary key and unique key are not supported yet. *Now
I know it ,thank you sincerely to reply me .

Best wishes,
LakeShen


Jark Wu <imj...@gmail.com> 于2019年9月12日周四 下午3:15写道:

> Hi Lake,
>
> This is not a problem of HBaseUpsertTableSink.
> This is because the query loses primary key (e.g. concat(key1, key2) will
> lose the primary key information [key1, key2] currently.),
> but the validation of inserting checks the upsert query should have a
> primary key. That’s why the exception is thrown.
>
> IMO, in order to fix this problem, we need to enrich the primary key
> inference to support all kinds of built-in function/operators.
> But this is a large work which means it may not happen in 1.9.1.
>
> Regards,
> Jark
>
> On Thu, 12 Sep 2019 at 14:39, LakeShen <shenleifight...@gmail.com> wrote:
>
> > Hi community , when I create the hbase sink table  in my  flink ddl sql
> > ,just like this:
> >
> >
> >
> >
> >
> > *create table sink_hbase_table(    rowkey                 VARCHAR,    cf
> >                   row(    kdt_it_count  bigint )) with (xxxxxx);*
> >
> > and I run my flink task , it throws the exception like this :
> > *UpsertStreamTableSink requires that Table has a full primary keys if it
> is
> > updated.*
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >
> > I saw the flink source code , I find that in HBaseUpsertTableSink , the
> > method setKeyFields doesnt' has any code content,in StreamExecSink
> class,I
> > saw the code content like this :
> > *//TODO UpsertStreamTableSink setKeyFields interface should be
> > Array[Array[String]]*
> > but now the  UpsertStreamTableSink setKeyFields interface is
> Array[String],
> > it seems like the conflict with the above content.
> > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.
> >
>

Reply via email to