This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 05d1961  [FLINK-24676][table-planner] Fix schema mismatch exception if 
explain insert with partial column
05d1961 is described below

commit 05d19616b200da052f345761325726d32c6c1c11
Author: godfreyhe <[email protected]>
AuthorDate: Thu Oct 28 10:33:27 2021 +0800

    [FLINK-24676][table-planner] Fix schema mismatch exception if explain 
insert with partial column
    
    This closes #17584
    
    (cherry picked from commit 37f8b380e2a962a46d287371bf37b7a97f4573cf)
---
 .../planner/calcite/PreValidateReWriter.scala      | 17 ++++++--
 ...estExecuteSqlWithExplainInsertPartialColumn.out | 15 +++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 46 +++++++++++-----------
 3 files changed, 51 insertions(+), 27 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 4f75fad..39c0aa5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.calcite
 
 import org.apache.flink.sql.parser.SqlProperty
 import org.apache.flink.sql.parser.dml.RichSqlInsert
+import org.apache.flink.sql.parser.dql.SqlRichExplain
 import org.apache.flink.table.api.ValidationException
 import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
 notSupported}
 import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, 
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -49,15 +50,25 @@ class PreValidateReWriter(
     val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
   override def visit(call: SqlCall): Unit = {
     call match {
-      case r: RichSqlInsert
-          if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null 
=> r.getSource match {
+      case e: SqlRichExplain =>
+        e.getStatement match {
+          case r: RichSqlInsert => rewriteInsert(r)
+          case _ => // do nothing
+        }
+      case r: RichSqlInsert => rewriteInsert(r)
+      case _ => // do nothing
+    }
+  }
+
+  private def rewriteInsert(r: RichSqlInsert): Unit = {
+    if (r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null) {
+      r.getSource match {
         case call: SqlCall =>
           val newSource = appendPartitionAndNullsProjects(
             r, validator, typeFactory, call, r.getStaticPartitions)
           r.setOperand(2, newSource)
         case source => throw new ValidationException(notSupported(source))
       }
-      case _ =>
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
new file mode 100644
index 0000000..30cacf9
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
@@ -0,0 +1,15 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[d, e])
++- LogicalProject(d=[$0], e=[null:INTEGER])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a AS d, null:INTEGER AS e], where=[>(a, 10)])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a AS d, null:INTEGER AS e], where=[(a > 10)])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 230930f..db798ee 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -31,17 +31,15 @@ import org.apache.flink.table.catalog.{Column, 
GenericInMemoryCatalog, ObjectPat
 import org.apache.flink.table.module.ModuleEntry
 import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
 import 
org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
-import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
+import org.apache.flink.table.planner.utils.TableTestUtil.{replaceStageId, 
replaceStreamNodeId}
 import org.apache.flink.table.planner.utils.{TableTestUtil, 
TestTableSourceSinks}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
-
 import org.junit.Assert._
 import org.junit.rules.ExpectedException
 import org.junit.{Rule, Test}
 
 import _root_.java.util
-
 import _root_.scala.collection.JavaConverters._
 
 class TableEnvironmentTest {
@@ -1106,16 +1104,8 @@ class TableEnvironmentTest {
     val tableResult1 = tableEnv.executeSql(createTableStmt)
     assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
 
-    val tableResult2 = tableEnv.executeSql("explain plan for select * from 
MyTable where a > 10")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
-    val it = tableResult2.collect()
-    assertTrue(it.hasNext)
-    val row = it.next()
-    assertEquals(1, row.getArity)
-    val actual = row.getField(0).toString
-    val expected = 
TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
-    assertFalse(it.hasNext)
+    checkExplain("explain plan for select * from MyTable where a > 10",
+      "/explain/testExecuteSqlWithExplainSelect.out")
   }
 
   @Test
@@ -1147,17 +1137,11 @@ class TableEnvironmentTest {
     val tableResult2 = tableEnv.executeSql(createTableStmt2)
     assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
 
-    val tableResult3 = tableEnv.executeSql(
-      "explain plan for insert into MySink select a, b from MyTable where a > 
10")
-    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
-    val it = tableResult3.collect()
-    assertTrue(it.hasNext)
-    val row = it.next()
-    assertEquals(1, row.getArity)
-    val actual = row.getField(0).toString
-    val expected = 
TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
-    assertEquals(replaceStageId(expected), replaceStageId(actual))
-    assertFalse(it.hasNext)
+    checkExplain("explain plan for insert into MySink select a, b from MyTable 
where a > 10",
+      "/explain/testExecuteSqlWithExplainInsert.out")
+
+    checkExplain("explain plan for insert into MySink(d) select a from MyTable 
where a > 10",
+      "/explain/testExecuteSqlWithExplainInsertPartialColumn.out")
   }
 
   @Test
@@ -1453,4 +1437,18 @@ class TableEnvironmentTest {
         actual.apply(i).asInstanceOf[Object])
     }
   }
+
+  private def checkExplain(sql: String, resultPath: String): Unit = {
+    val tableResult2 = tableEnv.executeSql(sql)
+    assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+    val it = tableResult2.collect()
+    assertTrue(it.hasNext)
+    val row = it.next()
+    assertEquals(1, row.getArity)
+    val actual = replaceStreamNodeId(row.getField(0).toString.trim)
+    val expected = replaceStreamNodeId(TableTestUtil
+      .readFromResource(resultPath).trim)
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+    assertFalse(it.hasNext)
+  }
 }

Reply via email to