This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 72b3eb7  [FLINK-22168][table] Partition insert can not work with union 
all
72b3eb7 is described below

commit 72b3eb7b5b31a0bf241f3197cf86fd90ba723823
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 22 18:02:10 2021 +0800

    [FLINK-22168][table] Partition insert can not work with union all
    
    This closes #15666
---
 .../planner/calcite/PreValidateReWriter.scala      |  74 +++-
 .../planner/plan/common/PartialInsertTest.xml      | 469 ++++++++++++++++++++-
 .../planner/plan/common/PartialInsertTest.scala    |  43 ++
 3 files changed, 549 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 8fc91d4..4f75fad 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -21,8 +21,9 @@ package org.apache.flink.table.planner.calcite
 import org.apache.flink.sql.parser.SqlProperty
 import org.apache.flink.sql.parser.dml.RichSqlInsert
 import org.apache.flink.table.api.ValidationException
-import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.appendPartitionAndNullsProjects
+import 
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
 notSupported}
 import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable, 
FlinkPreparingTableBase, LegacyCatalogSourceTable}
+import org.apache.flink.util.Preconditions.checkArgument
 
 import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.prepare.CalciteCatalogReader
@@ -33,7 +34,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.util.SqlBasicVisitor
 import org.apache.calcite.sql.validate.{SqlValidatorException, 
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, 
SqlNode, SqlNodeList, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral, 
SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
 import org.apache.calcite.util.Static.RESOURCE
 
 import java.util
@@ -50,16 +51,11 @@ class PreValidateReWriter(
     call match {
       case r: RichSqlInsert
           if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null 
=> r.getSource match {
-        case select: SqlSelect =>
-          appendPartitionAndNullsProjects(r, validator, typeFactory, select, 
r.getStaticPartitions)
-        case values: SqlCall if values.getKind == SqlKind.VALUES =>
-          val newSource = appendPartitionAndNullsProjects(r, validator, 
typeFactory, values,
-            r.getStaticPartitions)
+        case call: SqlCall =>
+          val newSource = appendPartitionAndNullsProjects(
+            r, validator, typeFactory, call, r.getStaticPartitions)
           r.setOperand(2, newSource)
-        case source =>
-          throw new ValidationException(
-            s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only 
support "
-              + s"SELECT and VALUES clause for now, '$source' is not supported 
yet.")
+        case source => throw new ValidationException(notSupported(source))
       }
       case _ =>
     }
@@ -67,7 +63,14 @@ class PreValidateReWriter(
 }
 
 object PreValidateReWriter {
+
   //~ Tools ------------------------------------------------------------------
+
+  private def notSupported(source: SqlNode): String = {
+    s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only support " +
+        s"SELECT, VALUES, SET_QUERY AND ORDER BY clause for now, '$source' is 
not supported yet."
+  }
+
   /**
     * Append the static partitions and unspecified columns to the data source 
projection list.
     * The columns are appended to the corresponding positions.
@@ -108,7 +111,6 @@ object PreValidateReWriter {
       typeFactory: RelDataTypeFactory,
       source: SqlCall,
       partitions: SqlNodeList): SqlCall = {
-    assert(source.getKind == SqlKind.SELECT || source.getKind == 
SqlKind.VALUES)
     val calciteCatalogReader = 
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
     val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
     val table = calciteCatalogReader.getTable(names)
@@ -185,11 +187,49 @@ object PreValidateReWriter {
       }
     }
 
-    source match {
-      case select: SqlSelect =>
-        rewriteSelect(validator, select, targetRowType, assignedFields, 
targetPosition)
-      case values: SqlCall if values.getKind == SqlKind.VALUES =>
-        rewriteValues(values, targetRowType, assignedFields, targetPosition)
+    rewriteSqlCall(validator, source, targetRowType, assignedFields, 
targetPosition)
+  }
+
+  private def rewriteSqlCall(
+      validator: FlinkCalciteSqlValidator,
+      call: SqlCall,
+      targetRowType: RelDataType,
+      assignedFields: util.LinkedHashMap[Integer, SqlNode],
+      targetPosition: util.List[Int]): SqlCall = {
+
+    def rewrite(node: SqlNode): SqlCall = {
+      checkArgument(node.isInstanceOf[SqlCall], node)
+      rewriteSqlCall(
+        validator,
+        node.asInstanceOf[SqlCall],
+        targetRowType,
+        assignedFields,
+        targetPosition)
+    }
+
+    call.getKind match {
+      case SqlKind.SELECT =>
+        rewriteSelect(
+          validator, call.asInstanceOf[SqlSelect], targetRowType, 
assignedFields, targetPosition)
+      case SqlKind.VALUES =>
+        rewriteValues(call, targetRowType, assignedFields, targetPosition)
+      case kind if SqlKind.SET_QUERY.contains(kind) =>
+        call.getOperandList.zipWithIndex.foreach {
+          case (operand, index) => call.setOperand(index, rewrite(operand))
+        }
+        call
+      case SqlKind.ORDER_BY =>
+        val operands = call.getOperandList
+        new SqlOrderBy(
+          call.getParserPosition,
+          rewrite(operands.get(0)),
+          operands.get(1).asInstanceOf[SqlNodeList],
+          operands.get(2),
+          operands.get(3))
+      // Not support:
+      // case SqlKind.WITH =>
+      // case SqlKind.EXPLICIT_TABLE =>
+      case _ => throw new ValidationException(notSupported(call))
     }
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index b900cb6..aa9aa50 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -38,6 +38,51 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, c, d, e, f, g]
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testPartialInsertWithComplexReorder[isBatch: true]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO sink (b,e,a,g,f,c,d) SELECT b,e,a,456,123,c,d FROM 
MyTable GROUP BY a,b,c,d,e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c, d, 
e, f, g])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[123:BIGINT], 
g=[456])
+   +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c, d, e, f, 
g])
++- Calc(select=[a, b, c, d, e, 123:BIGINT AS f, 456 AS g])
+   +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, b, c, 
d, e])
+      +- Exchange(distribution=[hash[a, b, c, d, e]])
+         +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testPartialInsertWithComplexReorderAndComputedColumn[isBatch: false]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], f=[123:BIGINT], g=[456])
+   +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Calc(select=[a, c, d, e, 123:BIGINT AS f, 456 AS g])
+   +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+      +- Exchange(distribution=[hash[a, b, c, d, e]])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase 
name="testPartialInsertWithComplexReorderAndComputedColumn[isBatch: true]">
     <Resource name="sql">
       <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e]]>
@@ -62,49 +107,433 @@ 
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testPartialInsertWithComplexReorder[isBatch: true]">
+  <TestCase name="testPartialInsertWithExceptAll[isBatch: false]">
     <Resource name="sql">
-      <![CDATA[INSERT INTO sink (b,e,a,g,f,c,d) SELECT b,e,a,456,123,c,d FROM 
MyTable GROUP BY a,b,c,d,e]]>
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e EXCEPT ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c, d, 
e, f, g])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[123:BIGINT], 
g=[456])
-   +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]])
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalMinus(all=[true])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.sink], fields=[a, b, c, d, e, f, 
g])
-+- Calc(select=[a, b, c, d, e, 123:BIGINT AS f, 456 AS g])
-   +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, b, c, 
d, e])
-      +- Exchange(distribution=[hash[a, b, c, d, e]])
-         +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
-            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+   +- 
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
 $1, $2, $3, $4, $5, $6)], 
correlate=[table(ReplicateRows(sum_vcol_marker,a,c,d,e,f,g))], 
select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], 
rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, 
VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, 
VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0 [...]
+      +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], 
where=[>(sum_vcol_marker, 0)])
+         +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, 
g, SUM_RETRACT(vcol_marker) AS sum_vcol_marker])
+            +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+               +- Union(all=[true], union=[a, c, d, e, f, g, vcol_marker])
+                  :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) 
AS g, 1:BIGINT AS vcol_marker])
+                  :  +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, 
c, d, e])
+                  :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+                  :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+                  +- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) 
AS g, -1:BIGINT AS vcol_marker])
+                     +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, 
c, d, e])
+                        +- Exchange(distribution=[hash[a, b, c, d, e]])
+                           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testPartialInsertWithComplexReorderAndComputedColumn[isBatch: false]">
+  <TestCase name="testPartialInsertWithExceptAll[isBatch: true]">
     <Resource name="sql">
-      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e]]>
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e EXCEPT ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
-+- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], f=[123:BIGINT], g=[456])
-   +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalMinus(all=[true])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
-+- Calc(select=[a, c, d, e, 123:BIGINT AS f, 456 AS g])
-   +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
-      +- Exchange(distribution=[hash[a, b, c, d, e]])
++- Sort(orderBy=[c ASC, d ASC])
+   +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+      +- 
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
 $1, $2, $3, $4, $5, $6)], 
correlate=[table(ReplicateRows(sum_vcol_marker,a,c,d,e,f,g))], 
select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], 
rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, 
VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, 
VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT [...]
+         +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], 
where=[>(sum_vcol_marker, 0)])
+            +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], 
select=[a, c, d, e, f, g, Final_SUM(sum$0) AS sum_vcol_marker])
+               +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+                  +- LocalHashAggregate(groupBy=[a, c, d, e, f, g], select=[a, 
c, d, e, f, g, Partial_SUM(vcol_marker) AS sum$0])
+                     +- Union(all=[true], union=[a, c, d, e, f, g, 
vcol_marker])
+                        :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, 
CAST(456) AS g, 1:BIGINT AS vcol_marker])
+                        :  +- HashAggregate(isMerge=[true], groupBy=[a, b, c, 
d, e], select=[a, b, c, d, e])
+                        :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+                        :        +- LocalHashAggregate(groupBy=[a, b, c, d, 
e], select=[a, b, c, d, e])
+                        :           +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+                        +- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, 
CAST(456) AS g, -1:BIGINT AS vcol_marker])
+                           +- HashAggregate(isMerge=[true], groupBy=[a, b, c, 
d, e], select=[a, b, c, d, e])
+                              +- Exchange(distribution=[hash[a, b, c, d, e]])
+                                 +- LocalHashAggregate(groupBy=[a, b, c, d, 
e], select=[a, b, c, d, e])
+                                    +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithIntersectAll[isBatch: false]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e INTERSECT ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalIntersect(all=[true])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+   +- 
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
 $1, $2, $3, $4, $5, $6)], correlate=[table(ReplicateRows($f0,a,c,d,e,f,g))], 
select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, 
INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, 
INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE 
e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+      +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, 
vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), 
>=(vcol_right_cnt, 1))])
+         +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, 
g, COUNT_RETRACT(vcol_left_marker) AS vcol_left_cnt, 
COUNT_RETRACT(vcol_right_marker) AS vcol_right_cnt])
+            +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+               +- Union(all=[true], union=[a, c, d, e, f, g, vcol_left_marker, 
vcol_right_marker])
+                  :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) 
AS g, true AS vcol_left_marker, null:BOOLEAN AS vcol_right_marker])
+                  :  +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, 
c, d, e])
+                  :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+                  :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+                  +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) 
AS g, null:BOOLEAN AS vcol_left_marker, true AS vcol_right_marker])
+                     +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, 
c, d, e])
+                        +- Exchange(distribution=[hash[a, b, c, d, e]])
+                           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithIntersectAll[isBatch: true]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e INTERSECT ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalIntersect(all=[true])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+   +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+      +- 
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
 $1, $2, $3, $4, $5, $6)], correlate=[table(ReplicateRows($f0,a,c,d,e,f,g))], 
select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, 
INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, 
INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE 
e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+         +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, 
vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), 
>=(vcol_right_cnt, 1))])
+            +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], 
select=[a, c, d, e, f, g, Final_COUNT(count$0) AS vcol_left_cnt, 
Final_COUNT(count$1) AS vcol_right_cnt])
+               +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+                  +- LocalHashAggregate(groupBy=[a, c, d, e, f, g], select=[a, 
c, d, e, f, g, Partial_COUNT(vcol_left_marker) AS count$0, 
Partial_COUNT(vcol_right_marker) AS count$1])
+                     +- Union(all=[true], union=[a, c, d, e, f, g, 
vcol_left_marker, vcol_right_marker])
+                        :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, 
CAST(456) AS g, true AS vcol_left_marker, null:BOOLEAN AS vcol_right_marker])
+                        :  +- HashAggregate(isMerge=[true], groupBy=[a, b, c, 
d, e], select=[a, b, c, d, e])
+                        :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+                        :        +- LocalHashAggregate(groupBy=[a, b, c, d, 
e], select=[a, b, c, d, e])
+                        :           +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+                        +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, 
CAST(789) AS g, null:BOOLEAN AS vcol_left_marker, true AS vcol_right_marker])
+                           +- HashAggregate(isMerge=[true], groupBy=[a, b, c, 
d, e], select=[a, b, c, d, e])
+                              +- Exchange(distribution=[hash[a, b, c, d, e]])
+                                 +- LocalHashAggregate(groupBy=[a, b, c, d, 
e], select=[a, b, c, d, e])
+                                    +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithOrderBy[isBatch: false]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalSort(sort0=[$0], sort1=[$3], sort2=[$1], sort3=[$2], 
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], dir2=[ASC-nulls-first], 
dir3=[ASC-nulls-first])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+   +- Sort(orderBy=[a ASC, e ASC, c ASC, d ASC])
+      +- Exchange(distribution=[single])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithOrderBy[isBatch: true]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalSort(sort0=[$0], sort1=[$3], sort2=[$1], sort3=[$2], 
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], dir2=[ASC-nulls-first], 
dir3=[ASC-nulls-first])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+   +- Sort(orderBy=[a ASC, e ASC, c ASC, d ASC])
+      +- Exchange(distribution=[single])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testPartialInsertWithUnion[isBatch: false]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION SELECT e,a,789,456,c,d 
FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalUnion(all=[false])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Calc(select=[a, c, d, e, CAST(EXPR$4) AS f, CAST(EXPR$5) AS g])
+   +- GroupAggregate(groupBy=[a, c, d, e, EXPR$4, EXPR$5], select=[a, c, d, e, 
EXPR$4, EXPR$5])
+      +- Exchange(distribution=[hash[a, c, d, e, EXPR$4, EXPR$5]])
+         +- Union(all=[true], union=[a, c, d, e, EXPR$4, EXPR$5])
+            :- Calc(select=[a, c, d, e, 123 AS EXPR$4, 456 AS EXPR$5])
+            :  +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, 
e])
+            :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+            :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+            +- Calc(select=[a, c, d, e, 456 AS EXPR$4, 789 AS EXPR$5])
+               +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, 
e])
+                  +- Exchange(distribution=[hash[a, b, c, d, e]])
+                     +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithUnion[isBatch: true]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION SELECT e,a,789,456,c,d 
FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalUnion(all=[false])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+   +- Calc(select=[a, c, d, e, CAST(EXPR$4) AS f, CAST(EXPR$5) AS g])
+      +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, EXPR$4, EXPR$5], 
select=[a, c, d, e, EXPR$4, EXPR$5])
+         +- Exchange(distribution=[hash[a, c, d, e, EXPR$4, EXPR$5]])
+            +- LocalHashAggregate(groupBy=[a, c, d, e, EXPR$4, EXPR$5], 
select=[a, c, d, e, EXPR$4, EXPR$5])
+               +- Union(all=[true], union=[a, c, d, e, EXPR$4, EXPR$5])
+                  :- Calc(select=[a, c, d, e, 123 AS EXPR$4, 456 AS EXPR$5])
+                  :  +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], 
select=[a, b, c, d, e])
+                  :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+                  :        +- LocalHashAggregate(groupBy=[a, b, c, d, e], 
select=[a, b, c, d, e])
+                  :           +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+                  +- Calc(select=[a, c, d, e, 456 AS EXPR$4, 789 AS EXPR$5])
+                     +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], 
select=[a, b, c, d, e])
+                        +- Exchange(distribution=[hash[a, b, c, d, e]])
+                           +- LocalHashAggregate(groupBy=[a, b, c, d, e], 
select=[a, b, c, d, e])
+                              +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithUnionAll[isBatch: false]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalUnion(all=[true])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Union(all=[true], union=[a, c, d, e, f, g])
+   :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+   :  +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+   :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+   :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+   +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+      +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+         +- Exchange(distribution=[hash[a, b, c, d, e]])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithUnionAll[isBatch: true]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalUnion(all=[true])
+      :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+   +- Union(all=[true], union=[a, c, d, e, f, g])
+      :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+      :  +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, 
b, c, d, e])
+      :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+      :        +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, 
d, e])
+      :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+      +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+         +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, 
b, c, d, e])
+            +- Exchange(distribution=[hash[a, b, c, d, e]])
+               +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, 
d, e])
+                  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithUnionAllNested[isBatch: true]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT 
e,a,123,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalUnion(all=[true])
+      :- LogicalUnion(all=[true])
+      :  :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      :  +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+      :     +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[123])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+   +- Union(all=[true], union=[a, c, d, e, f, g])
+      :- Union(all=[true], union=[a, c, d, e, f, g])
+      :  :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+      :  :  +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], 
select=[a, b, c, d, e])
+      :  :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+      :  :        +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, 
c, d, e])
+      :  :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+      :  +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+      :     +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], 
select=[a, b, c, d, e])
+      :        +- Exchange(distribution=[hash[a, b, c, d, e]])
+      :           +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, 
c, d, e])
+      :              +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+      +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(123) AS g])
+         +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, 
b, c, d, e])
+            +- Exchange(distribution=[hash[a, b, c, d, e]])
+               +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, 
d, e])
+                  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialInsertWithUnionAllNested[isBatch: false]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT 
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT 
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT 
e,a,123,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink], 
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT], 
g=[CAST($5):INTEGER])
+   +- LogicalUnion(all=[true])
+      :- LogicalUnion(all=[true])
+      :  :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123], 
EXPR$5=[456])
+      :  :  +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :  :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      :  +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[789])
+      :     +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+      +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456], 
EXPR$5=[123])
+         +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, 
d, e, f, g])
++- Union(all=[true], union=[a, c, d, e, f, g])
+   :- Union(all=[true], union=[a, c, d, e, f, g])
+   :  :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+   :  :  +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+   :  :     +- Exchange(distribution=[hash[a, b, c, d, e]])
+   :  :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+   :  +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+   :     +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+   :        +- Exchange(distribution=[hash[a, b, c, d, e]])
+   :           +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+   +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(123) AS g])
+      +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+         +- Exchange(distribution=[hash[a, b, c, d, e]])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
index fb6eac1..b16ce64 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
@@ -73,6 +73,49 @@ class PartialInsertTest(isBatch: Boolean) extends 
TableTestBase {
     util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
         "SELECT e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e")
   }
+
+  @Test
+  def testPartialInsertWithUnion(): Unit = {
+    testPartialInsertWithSetOperator("UNION")
+  }
+
+  @Test
+  def testPartialInsertWithUnionAll(): Unit = {
+    testPartialInsertWithSetOperator("UNION ALL")
+  }
+
+  @Test
+  def testPartialInsertWithIntersectAll(): Unit = {
+    testPartialInsertWithSetOperator("INTERSECT ALL")
+  }
+
+  @Test
+  def testPartialInsertWithExceptAll(): Unit = {
+    testPartialInsertWithSetOperator("EXCEPT ALL")
+  }
+
+  private def testPartialInsertWithSetOperator(operator: String): Unit = {
+    util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
+        "SELECT e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e " +
+        operator + " " +
+        "SELECT e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ")
+  }
+
+  @Test
+  def testPartialInsertWithUnionAllNested(): Unit = {
+    util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
+        "SELECT e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e " +
+        "UNION ALL " +
+        "SELECT e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e " +
+        "UNION ALL " +
+        "SELECT e,a,123,456,c,d FROM MyTable GROUP BY a,b,c,d,e ")
+  }
+
+  @Test
+  def testPartialInsertWithOrderBy(): Unit = {
+    util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
+        "SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d")
+  }
 }
 
 object PartialInsertTest {

Reply via email to