This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 4ceaac1  [FLINK-24676][table-planner] Fix schema mismatch exception if 
explain insert with partial column
4ceaac1 is described below

commit 4ceaac13a0b5a8eceac78f3f22c247dea62a55f3
Author: godfreyhe <[email protected]>
AuthorDate: Thu Oct 28 10:33:27 2021 +0800

    [FLINK-24676][table-planner] Fix schema mismatch exception if explain 
insert with partial column
    
    This closes #17584
    
    (cherry picked from commit 37f8b380e2a962a46d287371bf37b7a97f4573cf)
---
 .../table/planner/calcite/PreValidateReWriter.scala     | 17 ++++++++++++++---
 .../testExecuteSqlWithExplainInsertPartialColumn.out    | 15 +++++++++++++++
 .../apache/flink/table/api/TableEnvironmentTest.scala   |  3 +++
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 4f75fad..39c0aa5 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.SqlProperty
 import org.apache.flink.sql.parser.dml.RichSqlInsert
+import org.apache.flink.sql.parser.dql.SqlRichExplain
 import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
 notSupported}
 import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, 
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -49,15 +50,25 @@ class PreValidateReWriter(
     val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
   override def visit(call: SqlCall): Unit = {
     call match {
-      case r: RichSqlInsert
-          if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null 
=> r.getSource match {
+      case e: SqlRichExplain =>
+        e.getStatement match {
+          case r: RichSqlInsert => rewriteInsert(r)
+          case _ => // do nothing
+        }
+      case r: RichSqlInsert => rewriteInsert(r)
+      case _ => // do nothing
+    }
+  }
+
+  private def rewriteInsert(r: RichSqlInsert): Unit = {
+    if (r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null) {
+      r.getSource match {
         case call: SqlCall =>
           val newSource = appendPartitionAndNullsProjects(
             r, validator, typeFactory, call, r.getStaticPartitions)
           r.setOperand(2, newSource)
         case source => throw new ValidationException(notSupported(source))
       }
-      case _ =>
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
new file mode 100644
index 0000000..8d68899
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
@@ -0,0 +1,15 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[d, e])
++- LogicalProject(a=[$0], EXPR$1=[null:INTEGER])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, null:INTEGER AS EXPR$1], where=[>(a, 10)])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, null:INTEGER AS EXPR$1], where=[(a > 10)])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 997eb0c..22ec5c0 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -1258,6 +1258,9 @@ class TableEnvironmentTest {
 
     checkExplain("explain plan for insert into MySink select a, b from MyTable 
where a > 10",
       "/explain/testExecuteSqlWithExplainInsert.out")
+
+    checkExplain("explain plan for insert into MySink(d) select a from MyTable 
where a > 10",
+      "/explain/testExecuteSqlWithExplainInsertPartialColumn.out")
   }
 
   @Test

Reply via email to