[
https://issues.apache.org/jira/browse/FLINK-15404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17004043#comment-17004043
]
hehuiyuan commented on FLINK-15404:
-----------------------------------
{code:java}
// execute SQL
tableEnv.sqlUpdate("insert into default_catalog.default_database.sink_table
select * from default_catalog.default_database.source_table where age in
(select distinct(age) from myhive.`default`.staff)");
{code}
Hi [~lzljs3620320] , if i add a distinct key when select data from hive table
that is used as dimension table , it is not ok.
{code:java}
// ERROR
org.apache.flink.table.api.TableException: AppendStreamTableSink requires that
Table has only insert changes. at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:124)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408)
at com.flink.hive.BlinkStreamHiveTest.main(BlinkStreamHiveTest.java:130)
{code}
> How to insert hive table in streaming mode and blink planner
> ------------------------------------------------------------
>
> Key: FLINK-15404
> URL: https://issues.apache.org/jira/browse/FLINK-15404
> Project: Flink
> Issue Type: Wish
> Components: Table SQL / Planner
> Reporter: hehuiyuan
> Priority: Major
>
> I have a hive catalog :
>
> {code:java}
> catalog name : myhive
> database : default
> {code}
>
> and the flink has a default catalog :
>
> {code:java}
> catalog name : default_catalog
> database : default_database
> {code}
>
> For example :
> I have a source table 'source_table' that's from kafka which is register to
> default_catalog,
> I want to insert hive table 'hive_table' that is from myhive catalog.
> SQL:
> insert into hive_table select * from source_table;
>
> And if if the data of hive table has changed, flink is not found.
> SQL:
> tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from
> default_catalog.default_database.source_table");
--
This message was sent by Atlassian Jira
(v8.3.4#803005)