yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex insert URL: https://github.com/apache/flink/pull/7080 my code is `tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");` but flink give me error info, said kafka "No table was registered under the name kafka" i modify the code ,that is ok now TableEnvironment.scala ` def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } }` should modify to this `def sqlUpdate(stmt: String, config: QueryConfig): Unit = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(stmt) parsed match { case insert: SqlInsert => // validate the SQL query val query = insert.getSource val validatedQuery = planner.validate(query) // get query result as Table val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel)) // get name of sink table //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0) val targetTableName = insert.getTargetTable.toString // insert query result into sink table insertInto(queryResult, targetTableName, config) case _ => throw new TableException( "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.") } }` i hope this can be acceptted, thx
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
