danny0405 commented on a change in pull request #8966:
[FLINK-13074][table-planner-blink] Add PartitionableTableSink bridge logic to
flink&blink …
URL: https://github.com/apache/flink/pull/8966#discussion_r304266606
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
##########
@@ -101,17 +104,19 @@ class StreamPlanner(
val parsed = planner.parse(stmt)
parsed match {
- case insert: SqlInsert =>
+ case insert: RichSqlInsert =>
val targetColumnList = insert.getTargetColumnList
if (targetColumnList != null && insert.getTargetColumnList.size() !=
0) {
throw new ValidationException("Partial inserts are not supported")
}
// get name of sink table
val targetTablePath =
insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+ val staticPartitions = insert.getStaticPartitionKVs
List(new CatalogSinkModifyOperation(targetTablePath,
SqlToOperationConverter.convert(planner,
- insert.getSource).asInstanceOf[PlannerQueryOperation])
+ insert.getSource).asInstanceOf[PlannerQueryOperation],
+ staticPartitions)
Review comment:
We actually could not, because the flink-planner DatabaseCalciteSchema
always recognize the table as source, so the insert target table could not be
validated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services