[
https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17376230#comment-17376230
]
frank wang commented on FLINK-10851:
------------------------------------
[~dwysakowicz] yes, we can close this ticket
> sqlUpdate support complex insert grammar
> ----------------------------------------
>
> Key: FLINK-10851
> URL: https://issues.apache.org/jira/browse/FLINK-10851
> Project: Flink
> Issue Type: Bug
> Reporter: frank wang
> Priority: Major
> Labels: pull-request-available
>
> my code is
> {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1,
> filedName2 from kafka.sdkafka.order_4");}}
> but flink give me error info, said kafka "No table was registered under the
> name kafka"
> i modify the code ,that is ok now
> TableEnvironment.scala
> {code:java}
> def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
> val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner,
> getTypeFactory)
> // parse the sql query
> val parsed = planner.parse(stmt)
> parsed match {
> case insert: SqlInsert =>
> // validate the SQL query
> val query = insert.getSource
> val validatedQuery = planner.validate(query)
> // get query result as Table
> val queryResult = new Table(this,
> LogicalRelNode(planner.rel(validatedQuery).rel))
> // get name of sink table
> val targetTableName =
> insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
> // insert query result into sink table
> insertInto(queryResult, targetTableName, config)
> case _ =>
> throw new TableException(
> "Unsupported SQL query! sqlUpdate() only accepts SQL statements of
> type INSERT.")
> }
> }
> {code}
> should modify to this
> {code:java}
> def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
> val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner,
> getTypeFactory)
> // parse the sql query
> val parsed = planner.parse(stmt)
> parsed match {
> case insert: SqlInsert =>
> // validate the SQL query
> val query = insert.getSource
> val validatedQuery = planner.validate(query)
> // get query result as Table
> val queryResult = new Table(this,
> LogicalRelNode(planner.rel(validatedQuery).rel))
> // get name of sink table
> //val targetTableName =
> insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
> val targetTableName = insert.getTargetTable.toString
> // insert query result into sink table
> insertInto(queryResult, targetTableName, config)
> case _ =>
> throw new TableException(
> "Unsupported SQL query! sqlUpdate() only accepts SQL statements of
> type INSERT.")
> }
> }
> {code}
>
> i hope this can be acceptted, thx
--
This message was sent by Atlassian Jira
(v8.3.4#803005)