This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 72b3eb7 [FLINK-22168][table] Partition insert can not work with union
all
72b3eb7 is described below
commit 72b3eb7b5b31a0bf241f3197cf86fd90ba723823
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 22 18:02:10 2021 +0800
[FLINK-22168][table] Partition insert can not work with union all
This closes #15666
---
.../planner/calcite/PreValidateReWriter.scala | 74 +++-
.../planner/plan/common/PartialInsertTest.xml | 469 ++++++++++++++++++++-
.../planner/plan/common/PartialInsertTest.scala | 43 ++
3 files changed, 549 insertions(+), 37 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
index 8fc91d4..4f75fad 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/PreValidateReWriter.scala
@@ -21,8 +21,9 @@ package org.apache.flink.table.planner.calcite
import org.apache.flink.sql.parser.SqlProperty
import org.apache.flink.sql.parser.dml.RichSqlInsert
import org.apache.flink.table.api.ValidationException
-import
org.apache.flink.table.planner.calcite.PreValidateReWriter.appendPartitionAndNullsProjects
+import
org.apache.flink.table.planner.calcite.PreValidateReWriter.{appendPartitionAndNullsProjects,
notSupported}
import org.apache.flink.table.planner.plan.schema.{CatalogSourceTable,
FlinkPreparingTableBase, LegacyCatalogSourceTable}
+import org.apache.flink.util.Preconditions.checkArgument
import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.prepare.CalciteCatalogReader
@@ -33,7 +34,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.util.SqlBasicVisitor
import org.apache.calcite.sql.validate.{SqlValidatorException,
SqlValidatorTable, SqlValidatorUtil}
-import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral,
SqlNode, SqlNodeList, SqlSelect, SqlUtil}
+import org.apache.calcite.sql.{SqlCall, SqlIdentifier, SqlKind, SqlLiteral,
SqlNode, SqlNodeList, SqlOrderBy, SqlSelect, SqlUtil}
import org.apache.calcite.util.Static.RESOURCE
import java.util
@@ -50,16 +51,11 @@ class PreValidateReWriter(
call match {
case r: RichSqlInsert
if r.getStaticPartitions.nonEmpty || r.getTargetColumnList != null
=> r.getSource match {
- case select: SqlSelect =>
- appendPartitionAndNullsProjects(r, validator, typeFactory, select,
r.getStaticPartitions)
- case values: SqlCall if values.getKind == SqlKind.VALUES =>
- val newSource = appendPartitionAndNullsProjects(r, validator,
typeFactory, values,
- r.getStaticPartitions)
+ case call: SqlCall =>
+ val newSource = appendPartitionAndNullsProjects(
+ r, validator, typeFactory, call, r.getStaticPartitions)
r.setOperand(2, newSource)
- case source =>
- throw new ValidationException(
- s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only
support "
- + s"SELECT and VALUES clause for now, '$source' is not supported
yet.")
+ case source => throw new ValidationException(notSupported(source))
}
case _ =>
}
@@ -67,7 +63,14 @@ class PreValidateReWriter(
}
object PreValidateReWriter {
+
//~ Tools ------------------------------------------------------------------
+
+ private def notSupported(source: SqlNode): String = {
+ s"INSERT INTO <table> PARTITION [(COLUMN LIST)] statement only support " +
+ s"SELECT, VALUES, SET_QUERY AND ORDER BY clause for now, '$source' is
not supported yet."
+ }
+
/**
* Append the static partitions and unspecified columns to the data source
projection list.
* The columns are appended to the corresponding positions.
@@ -108,7 +111,6 @@ object PreValidateReWriter {
typeFactory: RelDataTypeFactory,
source: SqlCall,
partitions: SqlNodeList): SqlCall = {
- assert(source.getKind == SqlKind.SELECT || source.getKind ==
SqlKind.VALUES)
val calciteCatalogReader =
validator.getCatalogReader.unwrap(classOf[CalciteCatalogReader])
val names = sqlInsert.getTargetTable.asInstanceOf[SqlIdentifier].names
val table = calciteCatalogReader.getTable(names)
@@ -185,11 +187,49 @@ object PreValidateReWriter {
}
}
- source match {
- case select: SqlSelect =>
- rewriteSelect(validator, select, targetRowType, assignedFields,
targetPosition)
- case values: SqlCall if values.getKind == SqlKind.VALUES =>
- rewriteValues(values, targetRowType, assignedFields, targetPosition)
+ rewriteSqlCall(validator, source, targetRowType, assignedFields,
targetPosition)
+ }
+
+ private def rewriteSqlCall(
+ validator: FlinkCalciteSqlValidator,
+ call: SqlCall,
+ targetRowType: RelDataType,
+ assignedFields: util.LinkedHashMap[Integer, SqlNode],
+ targetPosition: util.List[Int]): SqlCall = {
+
+ def rewrite(node: SqlNode): SqlCall = {
+ checkArgument(node.isInstanceOf[SqlCall], node)
+ rewriteSqlCall(
+ validator,
+ node.asInstanceOf[SqlCall],
+ targetRowType,
+ assignedFields,
+ targetPosition)
+ }
+
+ call.getKind match {
+ case SqlKind.SELECT =>
+ rewriteSelect(
+ validator, call.asInstanceOf[SqlSelect], targetRowType,
assignedFields, targetPosition)
+ case SqlKind.VALUES =>
+ rewriteValues(call, targetRowType, assignedFields, targetPosition)
+ case kind if SqlKind.SET_QUERY.contains(kind) =>
+ call.getOperandList.zipWithIndex.foreach {
+ case (operand, index) => call.setOperand(index, rewrite(operand))
+ }
+ call
+ case SqlKind.ORDER_BY =>
+ val operands = call.getOperandList
+ new SqlOrderBy(
+ call.getParserPosition,
+ rewrite(operands.get(0)),
+ operands.get(1).asInstanceOf[SqlNodeList],
+ operands.get(2),
+ operands.get(3))
+ // Not support:
+ // case SqlKind.WITH =>
+ // case SqlKind.EXPLICIT_TABLE =>
+ case _ => throw new ValidationException(notSupported(call))
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index b900cb6..aa9aa50 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -38,6 +38,51 @@ Sink(table=[default_catalog.default_database.sink],
fields=[a, b, c, d, e, f, g]
]]>
</Resource>
</TestCase>
+ <TestCase name="testPartialInsertWithComplexReorder[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO sink (b,e,a,g,f,c,d) SELECT b,e,a,456,123,c,d FROM
MyTable GROUP BY a,b,c,d,e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c, d,
e, f, g])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[123:BIGINT],
g=[456])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c, d, e, f,
g])
++- Calc(select=[a, b, c, d, e, 123:BIGINT AS f, 456 AS g])
+ +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, b, c,
d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testPartialInsertWithComplexReorderAndComputedColumn[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], f=[123:BIGINT], g=[456])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Calc(select=[a, c, d, e, 123:BIGINT AS f, 456 AS g])
+ +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
<TestCase
name="testPartialInsertWithComplexReorderAndComputedColumn[isBatch: true]">
<Resource name="sql">
<![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e]]>
@@ -62,49 +107,433 @@
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d,
]]>
</Resource>
</TestCase>
- <TestCase name="testPartialInsertWithComplexReorder[isBatch: true]">
+ <TestCase name="testPartialInsertWithExceptAll[isBatch: false]">
<Resource name="sql">
- <![CDATA[INSERT INTO sink (b,e,a,g,f,c,d) SELECT b,e,a,456,123,c,d FROM
MyTable GROUP BY a,b,c,d,e]]>
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e EXCEPT ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c, d,
e, f, g])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[123:BIGINT],
g=[456])
- +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]])
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalMinus(all=[true])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.sink], fields=[a, b, c, d, e, f,
g])
-+- Calc(select=[a, b, c, d, e, 123:BIGINT AS f, 456 AS g])
- +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a, b, c,
d, e])
- +- Exchange(distribution=[hash[a, b, c, d, e]])
- +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +-
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
$1, $2, $3, $4, $5, $6)],
correlate=[table(ReplicateRows(sum_vcol_marker,a,c,d,e,f,g))],
select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0],
rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c,
VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0,
VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0 [...]
+ +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g],
where=[>(sum_vcol_marker, 0)])
+ +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f,
g, SUM_RETRACT(vcol_marker) AS sum_vcol_marker])
+ +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+ +- Union(all=[true], union=[a, c, d, e, f, g, vcol_marker])
+ :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456)
AS g, 1:BIGINT AS vcol_marker])
+ : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b,
c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456)
AS g, -1:BIGINT AS vcol_marker])
+ +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b,
c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
- <TestCase
name="testPartialInsertWithComplexReorderAndComputedColumn[isBatch: false]">
+ <TestCase name="testPartialInsertWithExceptAll[isBatch: true]">
<Resource name="sql">
- <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e]]>
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e EXCEPT ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
-+- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], f=[123:BIGINT], g=[456])
- +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalMinus(all=[true])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
-+- Calc(select=[a, c, d, e, 123:BIGINT AS f, 456 AS g])
- +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
- +- Exchange(distribution=[hash[a, b, c, d, e]])
++- Sort(orderBy=[c ASC, d ASC])
+ +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +-
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
$1, $2, $3, $4, $5, $6)],
correlate=[table(ReplicateRows(sum_vcol_marker,a,c,d,e,f,g))],
select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0],
rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c,
VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0,
VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT [...]
+ +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g],
where=[>(sum_vcol_marker, 0)])
+ +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g],
select=[a, c, d, e, f, g, Final_SUM(sum$0) AS sum_vcol_marker])
+ +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+ +- LocalHashAggregate(groupBy=[a, c, d, e, f, g], select=[a,
c, d, e, f, g, Partial_SUM(vcol_marker) AS sum$0])
+ +- Union(all=[true], union=[a, c, d, e, f, g,
vcol_marker])
+ :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f,
CAST(456) AS g, 1:BIGINT AS vcol_marker])
+ : +- HashAggregate(isMerge=[true], groupBy=[a, b, c,
d, e], select=[a, b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LocalHashAggregate(groupBy=[a, b, c, d,
e], select=[a, b, c, d, e])
+ : +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f,
CAST(456) AS g, -1:BIGINT AS vcol_marker])
+ +- HashAggregate(isMerge=[true], groupBy=[a, b, c,
d, e], select=[a, b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LocalHashAggregate(groupBy=[a, b, c, d,
e], select=[a, b, c, d, e])
+ +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithIntersectAll[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e INTERSECT ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalIntersect(all=[true])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +-
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
$1, $2, $3, $4, $5, $6)], correlate=[table(ReplicateRows($f0,a,c,d,e,f,g))],
select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0,
INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f,
INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE
e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+ +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt,
vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1),
>=(vcol_right_cnt, 1))])
+ +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f,
g, COUNT_RETRACT(vcol_left_marker) AS vcol_left_cnt,
COUNT_RETRACT(vcol_right_marker) AS vcol_right_cnt])
+ +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+ +- Union(all=[true], union=[a, c, d, e, f, g, vcol_left_marker,
vcol_right_marker])
+ :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456)
AS g, true AS vcol_left_marker, null:BOOLEAN AS vcol_right_marker])
+ : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b,
c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789)
AS g, null:BOOLEAN AS vcol_left_marker, true AS vcol_right_marker])
+ +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b,
c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithIntersectAll[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e INTERSECT ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalIntersect(all=[true])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+ +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+ +-
Correlate(invocation=[org$apache$flink$table$planner$functions$tablefunctions$ReplicateRows$9ac09a0a3aed959a450e9fbe5b75ac6b($0,
$1, $2, $3, $4, $5, $6)], correlate=[table(ReplicateRows($f0,a,c,d,e,f,g))],
select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0,
INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f,
INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE
e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
+ +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt,
vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1),
>=(vcol_right_cnt, 1))])
+ +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g],
select=[a, c, d, e, f, g, Final_COUNT(count$0) AS vcol_left_cnt,
Final_COUNT(count$1) AS vcol_right_cnt])
+ +- Exchange(distribution=[hash[a, c, d, e, f, g]])
+ +- LocalHashAggregate(groupBy=[a, c, d, e, f, g], select=[a,
c, d, e, f, g, Partial_COUNT(vcol_left_marker) AS count$0,
Partial_COUNT(vcol_right_marker) AS count$1])
+ +- Union(all=[true], union=[a, c, d, e, f, g,
vcol_left_marker, vcol_right_marker])
+ :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f,
CAST(456) AS g, true AS vcol_left_marker, null:BOOLEAN AS vcol_right_marker])
+ : +- HashAggregate(isMerge=[true], groupBy=[a, b, c,
d, e], select=[a, b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LocalHashAggregate(groupBy=[a, b, c, d,
e], select=[a, b, c, d, e])
+ : +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f,
CAST(789) AS g, null:BOOLEAN AS vcol_left_marker, true AS vcol_right_marker])
+ +- HashAggregate(isMerge=[true], groupBy=[a, b, c,
d, e], select=[a, b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LocalHashAggregate(groupBy=[a, b, c, d,
e], select=[a, b, c, d, e])
+ +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithOrderBy[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalSort(sort0=[$0], sort1=[$3], sort2=[$1], sort3=[$2],
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], dir2=[ASC-nulls-first],
dir3=[ASC-nulls-first])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+ +- Sort(orderBy=[a ASC, e ASC, c ASC, d ASC])
+ +- Exchange(distribution=[single])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithOrderBy[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalSort(sort0=[$0], sort1=[$3], sort2=[$1], sort3=[$2],
dir0=[ASC-nulls-first], dir1=[ASC-nulls-first], dir2=[ASC-nulls-first],
dir3=[ASC-nulls-first])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+ +- Sort(orderBy=[a ASC, e ASC, c ASC, d ASC])
+ +- Exchange(distribution=[single])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
+ <TestCase name="testPartialInsertWithUnion[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION SELECT e,a,789,456,c,d
FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalUnion(all=[false])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Calc(select=[a, c, d, e, CAST(EXPR$4) AS f, CAST(EXPR$5) AS g])
+ +- GroupAggregate(groupBy=[a, c, d, e, EXPR$4, EXPR$5], select=[a, c, d, e,
EXPR$4, EXPR$5])
+ +- Exchange(distribution=[hash[a, c, d, e, EXPR$4, EXPR$5]])
+ +- Union(all=[true], union=[a, c, d, e, EXPR$4, EXPR$5])
+ :- Calc(select=[a, c, d, e, 123 AS EXPR$4, 456 AS EXPR$5])
+ : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d,
e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, 456 AS EXPR$4, 789 AS EXPR$5])
+ +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d,
e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithUnion[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION SELECT e,a,789,456,c,d
FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalUnion(all=[false])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+ +- Calc(select=[a, c, d, e, CAST(EXPR$4) AS f, CAST(EXPR$5) AS g])
+ +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, EXPR$4, EXPR$5],
select=[a, c, d, e, EXPR$4, EXPR$5])
+ +- Exchange(distribution=[hash[a, c, d, e, EXPR$4, EXPR$5]])
+ +- LocalHashAggregate(groupBy=[a, c, d, e, EXPR$4, EXPR$5],
select=[a, c, d, e, EXPR$4, EXPR$5])
+ +- Union(all=[true], union=[a, c, d, e, EXPR$4, EXPR$5])
+ :- Calc(select=[a, c, d, e, 123 AS EXPR$4, 456 AS EXPR$5])
+ : +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e],
select=[a, b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LocalHashAggregate(groupBy=[a, b, c, d, e],
select=[a, b, c, d, e])
+ : +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, 456 AS EXPR$4, 789 AS EXPR$5])
+ +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e],
select=[a, b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LocalHashAggregate(groupBy=[a, b, c, d, e],
select=[a, b, c, d, e])
+ +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithUnionAll[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalUnion(all=[true])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Union(all=[true], union=[a, c, d, e, f, g])
+ :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+ : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+ +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithUnionAll[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalUnion(all=[true])
+ :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+ +- Union(all=[true], union=[a, c, d, e, f, g])
+ :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+ : +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a,
b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c,
d, e])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+ +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a,
b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c,
d, e])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithUnionAllNested[isBatch: true]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT
e,a,123,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalUnion(all=[true])
+ :- LogicalUnion(all=[true])
+ : :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ : +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[123])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Sort(orderBy=[c ASC, d ASC])
+ +- Union(all=[true], union=[a, c, d, e, f, g])
+ :- Union(all=[true], union=[a, c, d, e, f, g])
+ : :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+ : : +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e],
select=[a, b, c, d, e])
+ : : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : : +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b,
c, d, e])
+ : : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ : +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+ : +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e],
select=[a, b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b,
c, d, e])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(123) AS g])
+ +- HashAggregate(isMerge=[true], groupBy=[a, b, c, d, e], select=[a,
b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LocalHashAggregate(groupBy=[a, b, c, d, e], select=[a, b, c,
d, e])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPartialInsertWithUnionAllNested[isBatch: false]">
+ <Resource name="sql">
+ <![CDATA[INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT
e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT
e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e UNION ALL SELECT
e,a,123,456,c,d FROM MyTable GROUP BY a,b,c,d,e ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.partitioned_sink],
fields=[a, c, d, e, f, g])
++- LogicalProject(a=[$0], c=[$1], d=[$2], e=[$3], f=[CAST($4):BIGINT],
g=[CAST($5):INTEGER])
+ +- LogicalUnion(all=[true])
+ :- LogicalUnion(all=[true])
+ : :- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[123],
EXPR$5=[456])
+ : : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ : +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[789])
+ : +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+ +- LogicalProject(a=[$0], c=[$2], d=[$3], e=[$4], EXPR$4=[456],
EXPR$5=[123])
+ +- LogicalAggregate(group=[{0, 1, 2, 3, 4}])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c,
d, e, f, g])
++- Union(all=[true], union=[a, c, d, e, f, g])
+ :- Union(all=[true], union=[a, c, d, e, f, g])
+ : :- Calc(select=[a, c, d, e, CAST(123:BIGINT) AS f, CAST(456) AS g])
+ : : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ : : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ : +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(789) AS g])
+ : +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ : +- Exchange(distribution=[hash[a, b, c, d, e]])
+ : +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+ +- Calc(select=[a, c, d, e, CAST(456:BIGINT) AS f, CAST(123) AS g])
+ +- GroupAggregate(groupBy=[a, b, c, d, e], select=[a, b, c, d, e])
+ +- Exchange(distribution=[hash[a, b, c, d, e]])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, d, e)]]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
index fb6eac1..b16ce64 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/PartialInsertTest.scala
@@ -73,6 +73,49 @@ class PartialInsertTest(isBatch: Boolean) extends
TableTestBase {
util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
"SELECT e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e")
}
+
+ @Test
+ def testPartialInsertWithUnion(): Unit = {
+ testPartialInsertWithSetOperator("UNION")
+ }
+
+ @Test
+ def testPartialInsertWithUnionAll(): Unit = {
+ testPartialInsertWithSetOperator("UNION ALL")
+ }
+
+ @Test
+ def testPartialInsertWithIntersectAll(): Unit = {
+ testPartialInsertWithSetOperator("INTERSECT ALL")
+ }
+
+ @Test
+ def testPartialInsertWithExceptAll(): Unit = {
+ testPartialInsertWithSetOperator("EXCEPT ALL")
+ }
+
+ private def testPartialInsertWithSetOperator(operator: String): Unit = {
+ util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
+ "SELECT e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e " +
+ operator + " " +
+ "SELECT e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e ")
+ }
+
+ @Test
+ def testPartialInsertWithUnionAllNested(): Unit = {
+ util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
+ "SELECT e,a,456,123,c,d FROM MyTable GROUP BY a,b,c,d,e " +
+ "UNION ALL " +
+ "SELECT e,a,789,456,c,d FROM MyTable GROUP BY a,b,c,d,e " +
+ "UNION ALL " +
+ "SELECT e,a,123,456,c,d FROM MyTable GROUP BY a,b,c,d,e ")
+ }
+
+ @Test
+ def testPartialInsertWithOrderBy(): Unit = {
+ util.verifyRelPlanInsert("INSERT INTO partitioned_sink (e,a,g,f,c,d) " +
+ "SELECT e,a,456,123,c,d FROM MyTable ORDER BY a,e,c,d")
+ }
}
object PartialInsertTest {