This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0d0a8977d56 [FLINK-39558][table] `LogicalUnnestRule`: use Calcite
`Uncollect` rowType instead of `LogicalType` round-trip
0d0a8977d56 is described below
commit 0d0a8977d561b628bcf1dc7aa84ca9b4c8e58a61
Author: Jim Hughes <[email protected]>
AuthorDate: Thu May 7 17:12:32 2026 -0400
[FLINK-39558][table] `LogicalUnnestRule`: use Calcite `Uncollect` rowType
instead of `LogicalType` round-trip
---
.../plan/rules/logical/LogicalUnnestRule.java | 58 +--------
.../exec/batch/CorrelateBatchRestoreTest.java | 4 +-
.../nodes/exec/common/CorrelateTestPrograms.java | 47 +++++++
.../nodes/exec/stream/CorrelateRestoreTest.java | 2 +
.../table/planner/catalog/JavaCatalogTableTest.xml | 4 +-
.../table/planner/plan/batch/sql/UnnestTest.xml | 136 +++++++++++++++-----
.../plan/rules/logical/LogicalUnnestRuleTest.xml | 133 ++++++++++++++-----
.../planner/plan/stream/sql/MultiJoinTest.xml | 4 +-
.../table/planner/plan/stream/sql/UnnestTest.xml | 138 ++++++++++++++------
.../plan/correlate-cross-join-unnest-map.json | 137 ++++++++++++++++++++
...orrelate-cross-join-unnest-primitive-array.json | 130 +++++++++++++++++++
.../plan/correlate-cross-join-unnest-map.json | 141 +++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 8333 bytes
...orrelate-cross-join-unnest-primitive-array.json | 133 +++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 7099 bytes
.../table/planner/plan/common/UnnestTestBase.scala | 20 +++
.../planner/runtime/batch/sql/UnnestITCase.scala | 38 ++++++
.../planner/runtime/stream/sql/UnnestITCase.scala | 37 ++++++
18 files changed, 1001 insertions(+), 161 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
index a0805e4b110..65b9750a2eb 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
@@ -19,11 +19,7 @@
package org.apache.flink.table.planner.plan.rules.logical;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
-import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.runtime.functions.table.UnnestRowsFunctionBase;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
@@ -33,22 +29,15 @@ import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Correlate;
-import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.immutables.value.Value;
import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toRowType;
/**
* Planner rule that rewrites UNNEST to explode function.
@@ -106,10 +95,7 @@ public class LogicalUnnestRule extends
RelRule<LogicalUnnestRule.LogicalUnnestRu
relNode = convert(getRel(hepRelVertex), correlate);
}
if (relNode instanceof LogicalProject) {
- final LogicalProject logicalProject =
- correlate.getJoinType() == JoinRelType.LEFT
- ?
getLogicalProjectWithAdjustedNullability((LogicalProject) relNode)
- : (LogicalProject) relNode;
+ LogicalProject logicalProject = (LogicalProject) relNode;
return logicalProject.copy(
logicalProject.getTraitSet(),
ImmutableList.of(convert(getRel(logicalProject.getInput()), correlate)));
@@ -123,13 +109,6 @@ public class LogicalUnnestRule extends
RelRule<LogicalUnnestRule.LogicalUnnestRu
if (relNode instanceof Uncollect) {
Uncollect uncollect = (Uncollect) relNode;
RelOptCluster cluster = correlate.getCluster();
- FlinkTypeFactory typeFactory =
ShortcutUtils.unwrapTypeFactory(cluster);
- RelDataType relDataType =
- (RelDataType)
- ((Map.Entry)
uncollect.getInput().getRowType().getFieldList().get(0))
- .getValue();
- LogicalType logicalType =
FlinkTypeFactory.toLogicalType(relDataType);
-
BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(
cluster,
@@ -140,11 +119,7 @@ public class LogicalUnnestRule extends
RelRule<LogicalUnnestRule.LogicalUnnestRu
RexNode rexCall =
cluster.getRexBuilder()
.makeCall(
- typeFactory.createFieldTypeFromLogicalType(
- toRowType(
-
UnnestRowsFunctionBase.getUnnestedType(
- logicalType,
-
uncollect.withOrdinality))),
+ uncollect.getRowType(),
sqlFunction,
((LogicalProject)
getRel(uncollect.getInput())).getProjects());
return new LogicalTableFunctionScan(
@@ -167,35 +142,6 @@ public class LogicalUnnestRule extends
RelRule<LogicalUnnestRule.LogicalUnnestRu
return rel;
}
- /**
- * If unnesting type is {@code NOT NULL} however at the same time {@code
LEFT JOIN} makes it
- * nullable, this method adjusts nullability by inserting extra {@code
CAST}.
- */
- private LogicalProject
getLogicalProjectWithAdjustedNullability(LogicalProject logicalProject) {
- final RelOptCluster cluster = logicalProject.getCluster();
- FlinkTypeFactory typeFactory = (FlinkTypeFactory)
cluster.getTypeFactory();
- RexBuilder rexBuilder = cluster.getRexBuilder();
- final RelDataType rowType = logicalProject.getRowType();
- return logicalProject.copy(
- logicalProject.getTraitSet(),
- logicalProject.getInput(),
- logicalProject.getProjects().stream()
- .map(
- t -> {
- if (t.getType().isNullable()) {
- return t;
- }
- return rexBuilder.makeCast(
- createNullableType(typeFactory,
t.getType()), t);
- })
- .collect(Collectors.toList()),
- rowType.isNullable() ? rowType :
createNullableType(typeFactory, rowType));
- }
-
- private static RelDataType createNullableType(FlinkTypeFactory
typeFactory, RelDataType type) {
- return typeFactory.createTypeWithNullability(type, true);
- }
-
/** Rule configuration. */
@Value.Immutable(singleton = false)
public interface LogicalUnnestRuleConfig extends RelRule.Config {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
index 32325b16df2..0d6aac85430 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
@@ -39,6 +39,8 @@ public class CorrelateBatchRestoreTest extends
BatchRestoreTestBase {
CorrelateTestPrograms.CORRELATE_SYSTEM_FUNC,
CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
- CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST);
+ CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST,
+
CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_PRIMITIVE_ARRAY,
+ CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_MAP);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
index cb1127fa457..44bf5b351b1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.test.program.SourceTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
+import java.util.Map;
+
/** {@link TableTestProgram} definitions for testing {@link
StreamExecCorrelate}. */
public class CorrelateTestPrograms {
@@ -203,6 +205,51 @@ public class CorrelateTestPrograms {
"INSERT INTO sink_t SELECT (SELECT name, nested
FROM source_t, UNNEST(arr) AS T(nested)) FROM source_t")
.build();
+ public static final TableTestProgram
CORRELATE_CROSS_JOIN_UNNEST_PRIMITIVE_ARRAY =
+ TableTestProgram.of(
+ "correlate-cross-join-unnest-primitive-array",
+ "validate correlate with cross join and unnest of
primitive array")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("id INT", "vals ARRAY<INT NOT
NULL>")
+ .producedBeforeRestore(
+ Row.of(1, new Integer[] {10, 20}),
+ Row.of(2, new Integer[] {30}))
+ .producedAfterRestore(Row.of(3, new
Integer[] {40, 50}))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("id INT", "val INT")
+ .consumedBeforeRestore("+I[1, 10]", "+I[1,
20]", "+I[2, 30]")
+ .consumedAfterRestore("+I[3, 40]", "+I[3,
50]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT id, val FROM source_t
CROSS JOIN UNNEST(vals) AS u(val)")
+ .build();
+
+ public static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST_MAP =
+ TableTestProgram.of(
+ "correlate-cross-join-unnest-map",
+ "validate correlate with cross join and unnest of
map")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("id INT", "m MAP<STRING, INT>")
+ .producedBeforeRestore(
+ Row.of(1, Map.of("a", 10, "b",
20)),
+ Row.of(2, Map.of("c", 30)))
+ .producedAfterRestore(Row.of(3,
Map.of("d", 40)))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("id INT", "k STRING", "v INT")
+ .consumedBeforeRestore(
+ "+I[1, a, 10]", "+I[1, b, 20]",
"+I[2, c, 30]")
+ .consumedAfterRestore("+I[3, d, 40]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT id, k, v FROM source_t
CROSS JOIN UNNEST(m) AS u(k, v)")
+ .build();
+
public static final TableTestProgram CORRELATE_WITH_LITERAL_AGG =
TableTestProgram.of(
"correlate-with-literal-agg",
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
index 4ce3c0b1bf7..38d6e468ae9 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -40,6 +40,8 @@ public class CorrelateRestoreTest extends RestoreTestBase {
CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST,
+
CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_PRIMITIVE_ARRAY,
+ CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_MAP,
CorrelateTestPrograms.CORRELATE_WITH_LITERAL_AGG);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
index 3cb0ff64a9a..b4b39fd3742 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
@@ -167,8 +167,8 @@ LogicalProject(order_id=[$0], customer_id=[$1],
product_id=[$2], ts=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[order_id, customer_id, f0 AS product_id, ts])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor2.product_ids)],
correlate=[table($UNNEST_ROWS$1($cor2.product_ids))],
select=[order_id,customer_id,product_id,product_ids,ts,f0],
rowType=[RecordType(INTEGER order_id, INTEGER customer_id, INTEGER product_id,
INTEGER ARRAY product_ids, TIMESTAMP_LTZ(3) *ROWTIME* ts, INTEGER f0)],
joinType=[INNER])
+Calc(select=[order_id, customer_id, product_ids0 AS product_id, ts])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor2.product_ids)],
correlate=[table($UNNEST_ROWS$1($cor2.product_ids))],
select=[order_id,customer_id,product_id,product_ids,ts,product_ids0],
rowType=[RecordType(INTEGER order_id, INTEGER customer_id, INTEGER product_id,
INTEGER ARRAY product_ids, TIMESTAMP_LTZ(3) *ROWTIME* ts, INTEGER
product_ids0)], joinType=[INNER])
+- WatermarkAssigner(rowtime=[ts], watermark=[ts])
+- TableSourceScan(table=[[cat, default, t]], fields=[order_id,
customer_id, product_id, product_ids, ts])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index c0c753fbe87..5e0da95bb07 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -33,8 +33,8 @@ LogicalProject(a=[$0], s=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c,
VARCHAR(2147483647) c0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -56,8 +56,8 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, f1 AS v])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0,f1],
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647),
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1)],
joinType=[INNER])
+Calc(select=[a, b, VALUE AS v])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,KEY,VALUE],
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647),
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) KEY, VARCHAR(2147483647)
VALUE)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -117,8 +117,8 @@ LogicalProject(a=[$0], s=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,f0],
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set,
VARCHAR(2147483647) f0)], joinType=[LEFT])
+Calc(select=[a, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,set0],
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set,
VARCHAR(2147483647) set0)], joinType=[LEFT])
+- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COLLECT(set)
AS set])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC])
@@ -148,8 +148,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -171,9 +171,30 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinNoAliasList">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
</TestCase>
@@ -194,8 +215,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,data],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) data)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -217,9 +238,54 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)],
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[EXPR$0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)],
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))],
select=[business_data,nested,nested_array,EXPR$0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnPredicate">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <>
'debug']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[LEFT], condition=[<>($0,
_UTF-16LE'debug')])
++- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinWithOrdinalityOnPredicate">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, ord) ON
v.bd_name <> 'debug']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- LogicalProject(bd_name=[$0], ord=[$1])
+ +- Uncollect(withOrdinality=[true])
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data))],
select=[business_data,nested,nested_array,EXPR$0,ORDINALITY],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) EXPR$0, INTEGER ORDINALITY)], joinType=[LEFT], condition=
[...]
++- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
</TestCase>
@@ -240,8 +306,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -263,8 +329,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,data],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) data)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -296,8 +362,8 @@ LogicalProject(b=[$0], s=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,f0],
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)],
joinType=[INNER])
+Calc(select=[b, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,set0],
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT set0)],
joinType=[INNER])
+- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$,
rowtime, 3000)], select=[b, Final_COLLECT(set) AS set])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[b ASC, assignedWindow$ ASC])
@@ -327,8 +393,8 @@ LogicalProject(a=[$0], s=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
ARRAY f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
ARRAY c0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -432,8 +498,8 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)],
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
f0)], joinType=[INNER])
+Calc(select=[a, b, b0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)],
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,b0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
b0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -511,8 +577,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b])
]]>
</Resource>
@@ -544,9 +610,9 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3],
elem=[$4], element_pos=[
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[id, EXPR$0 AS array_val, ORDINALITY AS array_pos, EXPR$00 AS
elem, ORDINALITY0 AS element_pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))],
select=[id,nested_array,EXPR$0,ORDINALITY,EXPR$00,ORDINALITY0],
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY
EXPR$0, INTEGER ORDINALITY, INTEGER EXPR$00, INTEGER ORDINALITY0)],
joinType=[INNER])
- +-
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))],
select=[id,nested_array,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER id,
INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY EXPR$0, INTEGER ORDINALITY)],
joinType=[INNER])
+Calc(select=[id, nested_array0 AS array_val, ORDINALITY AS array_pos,
array_val AS elem, ORDINALITY0 AS element_pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))],
select=[id,nested_array,nested_array0,ORDINALITY,array_val,ORDINALITY0],
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY
nested_array0, INTEGER ORDINALITY, INTEGER array_val, INTEGER ORDINALITY0)],
joinType=[INNER])
+ +-
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))],
select=[id,nested_array,nested_array0,ORDINALITY], rowType=[RecordType(INTEGER
id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY nested_array0, INTEGER
ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[id, nested_array])
]]>
</Resource>
@@ -645,8 +711,8 @@ LogicalProject(id=[$0], k=[$2], v=[$3], pos=[$4])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[id, f0 AS k, f1 AS v, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))],
select=[id,map_data,f0,f1,ORDINALITY], rowType=[RecordType(INTEGER id,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647)
f0, VARCHAR(2147483647) f1, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[id, KEY AS k, VALUE AS v, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))],
select=[id,map_data,KEY,VALUE,ORDINALITY], rowType=[RecordType(INTEGER id,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647)
KEY, VARCHAR(2147483647) VALUE, INTEGER ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[id, map_data])
]]>
</Resource>
@@ -674,8 +740,8 @@ LogicalProject(a=[$0], word=[$2], pos=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, EXPR$0 AS word, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))],
select=[a,words,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a,
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) EXPR$0, INTEGER
ORDINALITY)], joinType=[INNER])
+Calc(select=[a, words0 AS word, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))],
select=[a,words,words0,ORDINALITY], rowType=[RecordType(INTEGER a,
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) words0, INTEGER
ORDINALITY)], joinType=[INNER])
+- SortAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_COLLECT(words) AS words])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC])
@@ -711,8 +777,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0,
10), <($1, 3))])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0, 10),
<($1, 3))])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index 930a6d94bf5..0618adc8a79 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -37,7 +37,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{2}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(s=[$0])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)],
rowType=[RecordType(VARCHAR(2147483647) c)])
]]>
</Resource>
</TestCase>
@@ -62,7 +62,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{2}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(k=[$0], v=[$1])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647)
f1)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)],
rowType=[RecordType(VARCHAR(2147483647) KEY, VARCHAR(2147483647) VALUE)])
]]>
</Resource>
</TestCase>
@@ -98,7 +98,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
: +- LogicalFilter(condition=[<($0, 3)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(x=[$0], y=[$1])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
]]>
</Resource>
</TestCase>
@@ -132,7 +132,7 @@ LogicalProject(a=[$0], s=[$2])
: +- LogicalProject(a=[$0], b=[$1])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(s=[$0])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType(VARCHAR(2147483647) set)])
]]>
</Resource>
</TestCase>
@@ -157,7 +157,7 @@ LogicalProject(bd_name=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+- LogicalProject(bd_name=[$0])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType(VARCHAR(2147483647) business_data)])
]]>
</Resource>
</TestCase>
@@ -181,8 +181,31 @@ LogicalProject(bd_name=[$3])
LogicalProject(bd_name=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
- +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalProject(bd_name=[$0])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType(VARCHAR(2147483647) business_data)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinNoAliasList">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType(VARCHAR(2147483647) business_data)])
]]>
</Resource>
</TestCase>
@@ -206,8 +229,8 @@ LogicalProject(bd_name=[$3])
LogicalProject(bd_name=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
- +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalProject(bd_name=[$0])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
rowType=[RecordType(VARCHAR(2147483647) data)])
]]>
</Resource>
</TestCase>
@@ -231,8 +254,60 @@ LogicalProject(bd_name=[$3])
LogicalProject(bd_name=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
:- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
- +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array,
0).data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +- LogicalProject(bd_name=[$0])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array,
0).data)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnPredicate">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <>
'debug']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType(VARCHAR(2147483647) business_data)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinWithOrdinalityOnPredicate">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, ord) ON
v.bd_name <> 'debug']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- LogicalProject(bd_name=[$0], ord=[$1])
+ +- Uncollect(withOrdinality=[true])
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- LogicalProject(bd_name=[$0], ord=[$1])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data)],
rowType=[RecordType(VARCHAR(2147483647) business_data, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -257,7 +332,7 @@ LogicalProject(bd_name=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+- LogicalProject(bd_name=[$0])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType(VARCHAR(2147483647) business_data)])
]]>
</Resource>
</TestCase>
@@ -282,7 +357,7 @@ LogicalProject(bd_name=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+- LogicalProject(bd_name=[$0])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
rowType=[RecordType(VARCHAR(2147483647) data)])
]]>
</Resource>
</TestCase>
@@ -321,7 +396,7 @@ LogicalProject(b=[$0], s=[$2])
: +- LogicalProject(b=[$1], $f1=[$TUMBLE($3, 3000:INTERVAL SECOND)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(s=[$0])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType:peek_no_expand(BIGINT f0)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType(BIGINT set)])
]]>
</Resource>
</TestCase>
@@ -346,7 +421,7 @@ LogicalProject(a=[$0], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{2}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(s=[$0])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)],
rowType=[RecordType:peek_no_expand(INTEGER ARRAY f0)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)],
rowType=[RecordType(INTEGER ARRAY c)])
]]>
</Resource>
</TestCase>
@@ -380,7 +455,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
: +- LogicalProject(b=[$1], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(id=[$0], point=[$1])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
]]>
</Resource>
</TestCase>
@@ -407,7 +482,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(s=[$0], t=[$1])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
]]>
</Resource>
</TestCase>
@@ -432,7 +507,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
+- LogicalFilter(condition=[>($2, 1)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
]]>
</Resource>
</TestCase>
@@ -457,7 +532,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(s=[$0])
- +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER f0)])
+ +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType(INTEGER b)])
]]>
</Resource>
</TestCase>
@@ -499,7 +574,7 @@ LogicalProject(a=[$0], b1=[$1], b2=[$2])
:- LogicalProject(a=[$0], b=[$1])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(b1=[$0], b2=[$1])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
]]>
</Resource>
</TestCase>
@@ -547,7 +622,7 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(number=[$0], ordinality=[$1])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType(INTEGER b, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -583,9 +658,9 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3],
elem=[$4], element_pos=[
:- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
: :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
: +- LogicalProject(array_val=[$0], array_pos=[$1])
- : +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
rowType=[RecordType:peek_no_expand(INTEGER ARRAY EXPR$0, INTEGER ORDINALITY)])
+ : +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
rowType=[RecordType(INTEGER ARRAY nested_array, INTEGER ORDINALITY)])
+- LogicalProject(elem=[$0], element_pos=[$1])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
rowType=[RecordType(INTEGER array_val, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -612,7 +687,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3], o=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(s=[$0], t=[$1], o=[$2])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2, INTEGER
ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -637,7 +712,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3],
ORDINALITY=[$4])
+- LogicalFilter(condition=[>($2, 1)])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2, INTEGER
ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -673,7 +748,7 @@ LogicalProject(a=[$0], number=[$1], ordinality=[$2])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(number=[$0], ordinality=[$1])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType(INTEGER b, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -698,7 +773,7 @@ LogicalProject(id=[$0], k=[$2], v=[$3], pos=[$4])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(k=[$0], v=[$1], pos=[$2])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647)
f1, INTEGER ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
rowType=[RecordType(VARCHAR(2147483647) KEY, VARCHAR(2147483647) VALUE,
INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -731,7 +806,7 @@ LogicalProject(a=[$0], word=[$2], pos=[$3])
: +- LogicalProject(a=[$0], c=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(word=[$0], pos=[$1])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) EXPR$0, INTEGER
ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
rowType=[RecordType(VARCHAR(2147483647) words, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
@@ -763,7 +838,7 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalProject(number=[$0], ordinality=[$1])
- +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
rowType=[RecordType(INTEGER b, INTEGER ORDINALITY)])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 3a5e888ca25..1372c8ec0a7 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -659,8 +659,8 @@
LogicalSink(table=[default_catalog.default_database.UnnestSink], targetColumns=[
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.UnnestSink],
targetColumns=[[0],[1],[2],[3],[4]], fields=[detail_id, element_data,
data_value_id, user_id, order_id])
-+- Calc(select=[detail_id, TRIM(BOTH, ' ', REGEXP_REPLACE(f0, '[\[\]\"]', ''))
AS element_data, ARRAY_POSITION(split(REGEXP_REPLACE(data, '^\["|"\]$', ''),
'", "'), f0) AS data_value_id, user_id, order_id])
- +- Correlate(invocation=[$UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data,
_UTF-16LE'^\["|"\]$', _UTF-16LE''), _UTF-16LE'", "'))],
correlate=[table($UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data, '^\["|"\]$',
''), '", "')))],
select=[detail_id,description,user_id,data,timestamp,order_id,user_id0,product,payment_id,price,user_id1,location,user_id2,f0],
rowType=[RecordType(VARCHAR(2147483647) detail_id, VARCHAR(2147483647)
description, VARCHAR(2147483647) user_id, VARCHAR(2147483647) data [...]
++- Calc(select=[detail_id, TRIM(BOTH, ' ', REGEXP_REPLACE(EXPR$0, '[\[\]\"]',
'')) AS element_data, ARRAY_POSITION(split(REGEXP_REPLACE(data, '^\["|"\]$',
''), '", "'), EXPR$0) AS data_value_id, user_id, order_id])
+ +- Correlate(invocation=[$UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data,
_UTF-16LE'^\["|"\]$', _UTF-16LE''), _UTF-16LE'", "'))],
correlate=[table($UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data, '^\["|"\]$',
''), '", "')))],
select=[detail_id,description,user_id,data,timestamp,order_id,user_id0,product,payment_id,price,user_id1,location,user_id2,EXPR$0],
rowType=[RecordType(VARCHAR(2147483647) detail_id, VARCHAR(2147483647)
description, VARCHAR(2147483647) user_id, VARCHAR(2147483647) [...]
+- Calc(select=[detail_id, description, user_id, data, timestamp,
order_id, user_id0, product, payment_id, price, user_id1, location, user_id2],
where=[IS NULL(location)])
+- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, LEFT],
inputUniqueKeys=[(detail_id), (order_id), (payment_id), noUniqueKey],
joinConditions=[=(user_id0, user_id), =(user_id1, user_id), =(user_id2,
user_id)],
select=[detail_id,description,user_id,data,timestamp,order_id,user_id0,product,payment_id,price,user_id1,location,user_id2],
rowType=[RecordType(VARCHAR(2147483647) detail_id, VARCHAR(2147483647)
description, VARCHAR(2147483647) user_id, VARCHAR(2147483647) da [...]
:- Exchange(distribution=[hash[user_id]])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index b925b5ce5c7..138d56d8c12 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -33,8 +33,8 @@ LogicalProject(a=[$0], s=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c,
VARCHAR(2147483647) c0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -56,8 +56,8 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, f1 AS v])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0,f1],
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647),
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1)],
joinType=[INNER])
+Calc(select=[a, b, VALUE AS v])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,KEY,VALUE],
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647),
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) KEY, VARCHAR(2147483647)
VALUE)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -117,8 +117,8 @@ LogicalProject(a=[$0], s=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,f0],
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set,
VARCHAR(2147483647) f0)], joinType=[LEFT])
+Calc(select=[a, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,set0],
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set,
VARCHAR(2147483647) set0)], joinType=[LEFT])
+- GroupAggregate(groupBy=[a], select=[a, COLLECT(b) AS set])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b], where=[(a < 5)])
@@ -143,8 +143,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -166,9 +166,30 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinNoAliasList">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
</TestCase>
@@ -189,8 +210,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,data],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) data)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -212,9 +233,54 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)],
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[EXPR$0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)],
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))],
select=[business_data,nested,nested_array,EXPR$0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnPredicate">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <>
'debug']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[LEFT], condition=[<>($0,
_UTF-16LE'debug')])
++- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinWithOrdinalityOnPredicate">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, ord) ON
v.bd_name <> 'debug']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2],
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+ +- LogicalProject(bd_name=[$0], ord=[$1])
+ +- Uncollect(withOrdinality=[true])
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data))],
select=[business_data,nested,nested_array,EXPR$0,ORDINALITY],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) EXPR$0, INTEGER ORDINALITY)], joinType=[LEFT], condition=
[...]
++- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
</TestCase>
@@ -235,8 +301,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,business_data0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) business_data0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -258,8 +324,8 @@ LogicalProject(bd_name=[$3])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,data],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) data)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
@@ -291,8 +357,8 @@ LogicalProject(b=[$0], s=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,f0],
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)],
joinType=[INNER])
+Calc(select=[b, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)],
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,set0],
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT set0)],
joinType=[INNER])
+- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$,
rowtime, 3000)], select=[b, COLLECT(b) AS set])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, rowtime], where=[(b < 3)])
@@ -317,8 +383,8 @@ LogicalProject(a=[$0], s=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
ARRAY f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)],
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
ARRAY c0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -417,8 +483,8 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)],
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
f0)], joinType=[INNER])
+Calc(select=[a, b, b0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)],
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,b0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER
b0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
]]>
</Resource>
@@ -445,7 +511,7 @@ LogicalProject(a=[$0])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)],
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,f0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER f0)], joinType=[INNER])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)],
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,b0],
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER b0)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, T2]],
fields=[a, b])
]]>
</Resource>
@@ -523,8 +589,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b])
]]>
</Resource>
@@ -556,9 +622,9 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3],
elem=[$4], element_pos=[
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[id, EXPR$0 AS array_val, ORDINALITY AS array_pos, EXPR$00 AS
elem, ORDINALITY0 AS element_pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))],
select=[id,nested_array,EXPR$0,ORDINALITY,EXPR$00,ORDINALITY0],
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY
EXPR$0, INTEGER ORDINALITY, INTEGER EXPR$00, INTEGER ORDINALITY0)],
joinType=[INNER])
- +-
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))],
select=[id,nested_array,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER id,
INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY EXPR$0, INTEGER ORDINALITY)],
joinType=[INNER])
+Calc(select=[id, nested_array0 AS array_val, ORDINALITY AS array_pos,
array_val AS elem, ORDINALITY0 AS element_pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))],
select=[id,nested_array,nested_array0,ORDINALITY,array_val,ORDINALITY0],
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY
nested_array0, INTEGER ORDINALITY, INTEGER array_val, INTEGER ORDINALITY0)],
joinType=[INNER])
+ +-
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))],
select=[id,nested_array,nested_array0,ORDINALITY], rowType=[RecordType(INTEGER
id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY nested_array0, INTEGER
ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[id, nested_array])
]]>
</Resource>
@@ -657,8 +723,8 @@ LogicalProject(id=[$0], k=[$2], v=[$3], pos=[$4])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[id, f0 AS k, f1 AS v, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))],
select=[id,map_data,f0,f1,ORDINALITY], rowType=[RecordType(INTEGER id,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647)
f0, VARCHAR(2147483647) f1, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[id, KEY AS k, VALUE AS v, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))],
select=[id,map_data,KEY,VALUE,ORDINALITY], rowType=[RecordType(INTEGER id,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647)
KEY, VARCHAR(2147483647) VALUE, INTEGER ORDINALITY)], joinType=[INNER])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[id, map_data])
]]>
</Resource>
@@ -686,8 +752,8 @@ LogicalProject(a=[$0], word=[$2], pos=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, EXPR$0 AS word, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))],
select=[a,words,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a,
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) EXPR$0, INTEGER
ORDINALITY)], joinType=[INNER])
+Calc(select=[a, words0 AS word, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))],
select=[a,words,words0,ORDINALITY], rowType=[RecordType(INTEGER a,
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) words0, INTEGER
ORDINALITY)], joinType=[INNER])
+- GroupAggregate(groupBy=[a], select=[a, COLLECT(c) AS words])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, c])
@@ -718,8 +784,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0,
10), <($1, 3))])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0, 10),
<($1, 3))])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
new file mode 100644
index 00000000000..313b3d3bf07
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
@@ -0,0 +1,137 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 5,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "m",
+ "dataType" : "MAP<VARCHAR(2147483647), INT>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[id, m])",
+ "dynamicFilteringDataListenerID" : "6757d5d5-a915-4774-88a6-40cb2d2fb0ba"
+ }, {
+ "id" : 6,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$UNNEST_ROWS$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "m",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `k`
VARCHAR(2147483647), `v` INT> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`f0` VARCHAR(2147483647), `f1` INT> NOT NULL"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `f0`
VARCHAR(2147483647), `f1` INT>",
+ "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.m)],
correlate=[table($UNNEST_ROWS$1($cor0.m))], select=[id,m,f0,f1],
rowType=[RecordType(INTEGER id, (VARCHAR(2147483647), INTEGER) MAP m,
VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER])"
+ }, {
+ "id" : 7,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+ "description" : "Calc(select=[id, f0 AS k, f1 AS v])"
+ }, {
+ "id" : 8,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "k",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "v",
+ "dataType" : "INT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[id, k, v])"
+ } ],
+ "edges" : [ {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
new file mode 100644
index 00000000000..a1f1fa6ca46
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
@@ -0,0 +1,130 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "vals",
+ "dataType" : "ARRAY<INT NOT NULL>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[id, vals])",
+ "dynamicFilteringDataListenerID" : "a0d0a3b5-53f1-46b7-ae4d-3b4fe7d9145d"
+ }, {
+ "id" : 2,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$UNNEST_ROWS$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "vals",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `val` INT NOT
NULL> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`f0` INT NOT NULL>"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `f0` INT NOT
NULL>",
+ "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.vals)],
correlate=[table($UNNEST_ROWS$1($cor0.vals))], select=[id,vals,f0],
rowType=[RecordType(INTEGER id, INTEGER ARRAY vals, INTEGER f0)],
joinType=[INNER])"
+ }, {
+ "id" : 3,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+ "description" : "Calc(select=[id, f0 AS val])"
+ }, {
+ "id" : 4,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "val",
+ "dataType" : "INT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[id, val])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
new file mode 100644
index 00000000000..a80c3aa0bb6
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
@@ -0,0 +1,141 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 26,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "m",
+ "dataType" : "MAP<VARCHAR(2147483647), INT>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[id, m])"
+ }, {
+ "id" : 27,
+ "type" : "stream-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$UNNEST_ROWS$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "m",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `k`
VARCHAR(2147483647), `v` INT> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`f0` VARCHAR(2147483647), `f1` INT> NOT NULL"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `f0`
VARCHAR(2147483647), `f1` INT>",
+ "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.m)],
correlate=[table($UNNEST_ROWS$1($cor0.m))], select=[id,m,f0,f1],
rowType=[RecordType(INTEGER id, (VARCHAR(2147483647), INTEGER) MAP m,
VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER])"
+ }, {
+ "id" : 28,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+ "description" : "Calc(select=[id, f0 AS k, f1 AS v])"
+ }, {
+ "id" : 29,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "k",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "v",
+ "dataType" : "INT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "VALUE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[id, k, v])"
+ } ],
+ "edges" : [ {
+ "source" : 26,
+ "target" : 27,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 27,
+ "target" : 28,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 28,
+ "target" : 29,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/savepoint/_metadata
new file mode 100644
index 00000000000..d64328abf13
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
new file mode 100644
index 00000000000..dc72585856a
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
@@ -0,0 +1,133 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 22,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "vals",
+ "dataType" : "ARRAY<INT NOT NULL>"
+ } ]
+ }
+ }
+ }
+ },
+ "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[id, vals])"
+ }, {
+ "id" : 23,
+ "type" : "stream-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$UNNEST_ROWS$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "vals",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `val` INT NOT
NULL> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`f0` INT NOT NULL>"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `f0` INT NOT
NULL>",
+ "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.vals)],
correlate=[table($UNNEST_ROWS$1($cor0.vals))], select=[id,vals,f0],
rowType=[RecordType(INTEGER id, INTEGER ARRAY vals, INTEGER f0)],
joinType=[INNER])"
+ }, {
+ "id" : 24,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+ "description" : "Calc(select=[id, f0 AS val])"
+ }, {
+ "id" : 25,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "id",
+ "dataType" : "INT"
+ }, {
+ "name" : "val",
+ "dataType" : "INT"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[id, val])"
+ } ],
+ "edges" : [ {
+ "source" : 22,
+ "target" : 23,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 23,
+ "target" : 24,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 24,
+ "target" : 25,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/savepoint/_metadata
new file mode 100644
index 00000000000..af368bb1100
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
index d1ee04db109..2634b0dc301 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
@@ -282,6 +282,26 @@ abstract class UnnestTestBase(withExecPlan: Boolean)
extends TableTestBase {
"SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE")
}
+ @Test
+ def testNullMismatchLeftJoinNoAliasList(): Unit = {
+ util.verifyRelPlan(
+ "SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE")
+ }
+
+ @Test
+ def testNullMismatchLeftJoinOnPredicate(): Unit = {
+ util.verifyRelPlan(
+ "SELECT * FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <> 'debug'")
+ }
+
+ @Test
+ def testNullMismatchLeftJoinWithOrdinalityOnPredicate(): Unit = {
+ util.verifyRelPlan(
+ "SELECT * FROM nested_not_null LEFT JOIN " +
+ "UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name,
ord) " +
+ "ON v.bd_name <> 'debug'")
+ }
+
def verifyPlan(sql: String): Unit = {
if (withExecPlan) {
util.verifyExecPlan(sql)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
index 669c9962487..09587cc5f61 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
@@ -587,4 +587,42 @@ class UnnestITCase extends BatchTestBase {
Seq(row(1, 12, "45.6", 1), row(2, 13, "41.6", 1))
)
}
+
+ @Test
+ def testLeftJoinUnnestNoAliasList(): Unit = {
+ val data = List(
+ row(1, Array(10, 20)),
+ row(2, Array.empty[Int]),
+ row(3, Array(30))
+ )
+ registerCollection(
+ "T",
+ data,
+ new RowTypeInfo(Types.INT, Types.PRIMITIVE_ARRAY(Types.INT)),
+ "id, vals")
+
+ checkResult(
+ "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS exploded_val
ON TRUE",
+ Seq(row(1, 10), row(1, 20), row(2, null), row(3, 30))
+ )
+ }
+
+ @Test
+ def testLeftJoinUnnestOnPredicate(): Unit = {
+ val data = List(
+ row(1, Array(1, 50, 100)),
+ row(2, Array(1, 2)),
+ row(3, Array.empty[Int])
+ )
+ registerCollection(
+ "T",
+ data,
+ new RowTypeInfo(Types.INT, Types.PRIMITIVE_ARRAY(Types.INT)),
+ "id, vals")
+
+ checkResult(
+ "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS exploded_val
ON exploded_val > 10",
+ Seq(row(1, 50), row(1, 100), row(2, null), row(3, null))
+ )
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
index ec8e29e24c0..17f0093ec39 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
@@ -782,4 +782,41 @@ class UnnestITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase(mo
val expectedFieldNames = List("x", "y", "ord")
assertThat(fieldNames).isEqualTo(expectedFieldNames)
}
+
+ @TestTemplate
+ def testLeftJoinUnnestNoAliasList(): Unit = {
+ val data = List(
+ (1, Array(10, 20)),
+ (2, Array.empty[Int]),
+ (3, Array(30))
+ )
+ assertUnnest(
+ testData = data,
+ typeInfo = createTypeInformation[(Int, Array[Int])],
+ sqlQuery = "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS
exploded_val ON TRUE",
+ expectedResults = List("1,10", "1,20", "2,null", "3,30"),
+ isRetract = false,
+ fieldNames = 'id,
+ 'vals
+ )
+ }
+
+ @TestTemplate
+ def testLeftJoinUnnestOnPredicate(): Unit = {
+ val data = List(
+ (1, Array(1, 50, 100)),
+ (2, Array(1, 2)),
+ (3, Array.empty[Int])
+ )
+ assertUnnest(
+ testData = data,
+ typeInfo = createTypeInformation[(Int, Array[Int])],
+ sqlQuery =
+ "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS
exploded_val ON exploded_val > 10",
+ expectedResults = List("1,50", "1,100", "2,null", "3,null"),
+ isRetract = false,
+ fieldNames = 'id,
+ 'vals
+ )
+ }
}