yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex 
insert
URL: https://github.com/apache/flink/pull/7080
 
 
   my code is 
   `tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, 
filedName2 from kafka.sdkafka.order_4");`
   
   but flink give me error info, said kafka "No table was registered under the 
name kafka"
   i modify the code ,that is ok now
   TableEnvironment.scala
   ` def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
       val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
       // parse the sql query
       val parsed = planner.parse(stmt)
       parsed match {
         case insert: SqlInsert =>
           // validate the SQL query
           val query = insert.getSource
           val validatedQuery = planner.validate(query)
   
           // get query result as Table
           val queryResult = new Table(this, 
LogicalRelNode(planner.rel(validatedQuery).rel))
   
           // get name of sink table
           val targetTableName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
   
           // insert query result into sink table
           insertInto(queryResult, targetTableName, config)
         case _ =>
           throw new TableException(
             "Unsupported SQL query! sqlUpdate() only accepts SQL statements of 
type INSERT.")
       }
     }`
   
   should modify to this
   
   `def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
       val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
       // parse the sql query
       val parsed = planner.parse(stmt)
       parsed match {
         case insert: SqlInsert =>
           // validate the SQL query
           val query = insert.getSource
           val validatedQuery = planner.validate(query)
   
           // get query result as Table
           val queryResult = new Table(this, 
LogicalRelNode(planner.rel(validatedQuery).rel))
   
           // get name of sink table
           //val targetTableName = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
           val targetTableName = insert.getTargetTable.toString
   
           // insert query result into sink table
           insertInto(queryResult, targetTableName, config)
         case _ =>
           throw new TableException(
             "Unsupported SQL query! sqlUpdate() only accepts SQL statements of 
type INSERT.")
       }
     }`
   
   
   i hope this can be acceptted, thx

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to