danny0405 commented on a change in pull request #8966: 
[FLINK-13074][table-planner-blink] Add PartitionableTableSink bridge logic to 
flink&blink …
URL: https://github.com/apache/flink/pull/8966#discussion_r304266606
 
 

 ##########
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 ##########
 @@ -101,17 +104,19 @@ class StreamPlanner(
     val parsed = planner.parse(stmt)
 
     parsed match {
-      case insert: SqlInsert =>
+      case insert: RichSqlInsert =>
         val targetColumnList = insert.getTargetColumnList
         if (targetColumnList != null && insert.getTargetColumnList.size() != 
0) {
           throw new ValidationException("Partial inserts are not supported")
         }
         // get name of sink table
         val targetTablePath = 
insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+        val staticPartitions = insert.getStaticPartitionKVs
 
         List(new CatalogSinkModifyOperation(targetTablePath,
           SqlToOperationConverter.convert(planner,
-            insert.getSource).asInstanceOf[PlannerQueryOperation])
+            insert.getSource).asInstanceOf[PlannerQueryOperation],
+          staticPartitions)
 
 Review comment:
   We actually could not, because the flink-planner DatabaseCalciteSchema 
always recognize the table as source, so the insert target table could not be 
validated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to