This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 05d1961 [FLINK-24676][table-planner] Fix schema mismatch exception if
explain insert with partial column
05d1961 is described below
commit 05d19616b200da052f345761325726d32c6c1c11
Author: godfreyhe <[email protected]>
AuthorDate: Thu Oct 28 10:33:27 2021 +0800
[FLINK-24676][table-planner] Fix schema mismatch exception if explain
insert with partial column
This closes #17584
(cherry picked from commit 37f8b380e2a962a46d287371bf37b7a97f4573cf)
---
.../planner/calcite/PreValidateReWriter.scala | 17 ++++++--
...estExecuteSqlWithExplainInsertPartialColumn.out | 15 +++++++
.../flink/table/api/TableEnvironmentTest.scala | 46 +++++++++++-----------
3 files changed, 51 insertions(+), 27 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 4f75fad..39c0aa5 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
+import org.apache.flink.sql.parser.dql.SqlRichExplain
import org.apache.flink.table.api.ValidationException
import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable,
FlinkPreparingTableBase, LegacyCatalogSourceTable}
@@ -49,15 +50,25 @@ class PreValidateReWriter(
val typeFactory: RelDataTypeFactory) extends SqlBasicVisitor[Unit] {
override def visit(call: SqlCall): Unit = {
call match {
- case r: RichSqlInsert
- if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null
=> r.getSource match {
+ case e: SqlRichExplain =>
+ e.getStatement match {
+ case r: RichSqlInsert => rewriteInsert(r)
+ case _ => // do nothing
+ }
+ case r: RichSqlInsert => rewriteInsert(r)
+ case _ => // do nothing
+ }
+ }
+
+ private def rewriteInsert(r: RichSqlInsert): Unit = {
+ if (r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null) {
+ r.getSource match {
case call: SqlCall =>
val newSource = appendPartitionAndNullsProjects(
r, validator, typeFactory, call, r.getStaticPartitions)
r.setOperand(2, newSource)
case source => throw new ValidationException(notSupported(source))
}
- case _ =>
}
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
new file mode 100644
index 0000000..30cacf9
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExecuteSqlWithExplainInsertPartialColumn.out
@@ -0,0 +1,15 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`],
fields=[d, e])
++- LogicalProject(d=[$0], e=[null:INTEGER])
+ +- LogicalFilter(condition=[>($0, 10)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Physical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a AS d, null:INTEGER AS e], where=[>(a, 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Optimized Execution Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a AS d, null:INTEGER AS e], where=[(a > 10)])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 230930f..db798ee 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -31,17 +31,15 @@ import org.apache.flink.table.catalog.{Column,
GenericInMemoryCatalog, ObjectPat
import org.apache.flink.table.module.ModuleEntry
import org.apache.flink.table.planner.runtime.stream.sql.FunctionITCase.TestUDF
import
org.apache.flink.table.planner.runtime.stream.table.FunctionITCase.SimpleScalarFunction
-import org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId
+import org.apache.flink.table.planner.utils.TableTestUtil.{replaceStageId,
replaceStreamNodeId}
import org.apache.flink.table.planner.utils.{TableTestUtil,
TestTableSourceSinks}
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
-
import org.junit.Assert._
import org.junit.rules.ExpectedException
import org.junit.{Rule, Test}
import _root_.java.util
-
import _root_.scala.collection.JavaConverters._
class TableEnvironmentTest {
@@ -1106,16 +1104,8 @@ class TableEnvironmentTest {
val tableResult1 = tableEnv.executeSql(createTableStmt)
assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
- val tableResult2 = tableEnv.executeSql("explain plan for select * from
MyTable where a > 10")
- assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
- val it = tableResult2.collect()
- assertTrue(it.hasNext)
- val row = it.next()
- assertEquals(1, row.getArity)
- val actual = row.getField(0).toString
- val expected =
TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainSelect.out")
- assertEquals(replaceStageId(expected), replaceStageId(actual))
- assertFalse(it.hasNext)
+ checkExplain("explain plan for select * from MyTable where a > 10",
+ "/explain/testExecuteSqlWithExplainSelect.out")
}
@Test
@@ -1147,17 +1137,11 @@ class TableEnvironmentTest {
val tableResult2 = tableEnv.executeSql(createTableStmt2)
assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
- val tableResult3 = tableEnv.executeSql(
- "explain plan for insert into MySink select a, b from MyTable where a >
10")
- assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
- val it = tableResult3.collect()
- assertTrue(it.hasNext)
- val row = it.next()
- assertEquals(1, row.getArity)
- val actual = row.getField(0).toString
- val expected =
TableTestUtil.readFromResource("/explain/testExecuteSqlWithExplainInsert.out")
- assertEquals(replaceStageId(expected), replaceStageId(actual))
- assertFalse(it.hasNext)
+ checkExplain("explain plan for insert into MySink select a, b from MyTable
where a > 10",
+ "/explain/testExecuteSqlWithExplainInsert.out")
+
+ checkExplain("explain plan for insert into MySink(d) select a from MyTable
where a > 10",
+ "/explain/testExecuteSqlWithExplainInsertPartialColumn.out")
}
@Test
@@ -1453,4 +1437,18 @@ class TableEnvironmentTest {
actual.apply(i).asInstanceOf[Object])
}
}
+
+ private def checkExplain(sql: String, resultPath: String): Unit = {
+ val tableResult2 = tableEnv.executeSql(sql)
+ assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind)
+ val it = tableResult2.collect()
+ assertTrue(it.hasNext)
+ val row = it.next()
+ assertEquals(1, row.getArity)
+ val actual = replaceStreamNodeId(row.getField(0).toString.trim)
+ val expected = replaceStreamNodeId(TableTestUtil
+ .readFromResource(resultPath).trim)
+ assertEquals(replaceStageId(expected), replaceStageId(actual))
+ assertFalse(it.hasNext)
+ }
}