This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 4ceaac1 [FLINK-24676][table-planner] Fix schema mismatch exception if
explain insert with partial column
4ceaac1 is described below
commit 4ceaac13a0b5a8eceac78f3f22c247dea62a55f3
Author: godfreyhe <[email protected]>
AuthorDate: Thu Oct 28 10:33:27 2021 +0800
[FLINK-24676][table-planner] Fix schema mismatch exception if explain
insert with partial column
This closes #17584
(cherry picked from commit 37f8b380e2a962a46d287371bf37b7a97f4573cf)
---
.../table/planner/calcite/PreValidateReWriter.scala | 17 ++++++++++++++---
.../testExecuteSqlWithExplainInsertPartialColumn.out | 15 +++++++++++++++
.../apache/flink/table/api/TableEnvironmentTest.scala | 3 +++
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 4f75fad..39c0aa5 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
+import org.apache.flink.sql.parser.dql.SqlRichExplain
import org.apache.flink.table.api.ValidationException
import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable,
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -49,15 +50,25 @@ class PreValidateReWriter(
val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
override def visit(call: SqlCall): Unit = {
call match {
- case r: RichSqlInsert
- if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null
=> r.getSource match {
+ case e: SqlRichExplain =>
+ e.getStatement match {
+ case r: RichSqlInsert => rewriteInsert(r)
+ case _ => // do nothing
+ }
+ case r: RichSqlInsert => rewriteInsert(r)
+ case _ => // do nothing
+ }
+ }
+
+ private def rewriteInsert(r: RichSqlInsert): Unit = {
+ if (r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null) {
+ r.getSource match {
case call: SqlCall =>
val newSource = appendPartitionAndNullsProjects(
r, validator, typeFactory, call, r.getStaticPartitions)
r.setOperand(2, newSource)
case source => throw new ValidationException(notSupported(source))
}
- case _ =>
}
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
new file mode 100644
index 0000000..8d68899
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
@@ -0,0 +1,15 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`],
fields=[d, e])
++- LogicalProject(a=[$0], EXPR$1=[null:INTEGER])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, null:INTEGER AS EXPR$1], where=[>(a, 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, null:INTEGER AS EXPR$1], where=[(a > 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 997eb0c..22ec5c0 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -1258,6 +1258,9 @@ class TableEnvironmentTest {
checkExplain("explain plan for insert into MySink select a, b from MyTable
where a > 10",
"/explain/testExecuteSqlWithExplainInsert.out")
+
+ checkExplain("explain plan for insert into MySink(d) select a from MyTable
where a > 10",
+ "/explain/testExecuteSqlWithExplainInsertPartialColumn.out")
}
@Test