This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d0a8977d56 [FLINK-39558][table] `LogicalUnnestRule`: use Calcite 
`Uncollect` rowType instead of `LogicalType` round-trip
0d0a8977d56 is described below

commit 0d0a8977d561b628bcf1dc7aa84ca9b4c8e58a61
Author: Jim Hughes <[email protected]>
AuthorDate: Thu May 7 17:12:32 2026 -0400

    [FLINK-39558][table] `LogicalUnnestRule`: use Calcite `Uncollect` rowType 
instead of `LogicalType` round-trip
---
 .../plan/rules/logical/LogicalUnnestRule.java      |  58 +--------
 .../exec/batch/CorrelateBatchRestoreTest.java      |   4 +-
 .../nodes/exec/common/CorrelateTestPrograms.java   |  47 +++++++
 .../nodes/exec/stream/CorrelateRestoreTest.java    |   2 +
 .../table/planner/catalog/JavaCatalogTableTest.xml |   4 +-
 .../table/planner/plan/batch/sql/UnnestTest.xml    | 136 +++++++++++++++-----
 .../plan/rules/logical/LogicalUnnestRuleTest.xml   | 133 ++++++++++++++-----
 .../planner/plan/stream/sql/MultiJoinTest.xml      |   4 +-
 .../table/planner/plan/stream/sql/UnnestTest.xml   | 138 ++++++++++++++------
 .../plan/correlate-cross-join-unnest-map.json      | 137 ++++++++++++++++++++
 ...orrelate-cross-join-unnest-primitive-array.json | 130 +++++++++++++++++++
 .../plan/correlate-cross-join-unnest-map.json      | 141 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 8333 bytes
 ...orrelate-cross-join-unnest-primitive-array.json | 133 +++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 7099 bytes
 .../table/planner/plan/common/UnnestTestBase.scala |  20 +++
 .../planner/runtime/batch/sql/UnnestITCase.scala   |  38 ++++++
 .../planner/runtime/stream/sql/UnnestITCase.scala  |  37 ++++++
 18 files changed, 1001 insertions(+), 161 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
index a0805e4b110..65b9750a2eb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
@@ -19,11 +19,7 @@
 package org.apache.flink.table.planner.plan.rules.logical;
 
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
-import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.runtime.functions.table.UnnestRowsFunctionBase;
-import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
 
@@ -33,22 +29,15 @@ import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Correlate;
-import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.immutables.value.Value;
 
 import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toRowType;
 
 /**
  * Planner rule that rewrites UNNEST to explode function.
@@ -106,10 +95,7 @@ public class LogicalUnnestRule extends 
RelRule<LogicalUnnestRule.LogicalUnnestRu
             relNode = convert(getRel(hepRelVertex), correlate);
         }
         if (relNode instanceof LogicalProject) {
-            final LogicalProject logicalProject =
-                    correlate.getJoinType() == JoinRelType.LEFT
-                            ? 
getLogicalProjectWithAdjustedNullability((LogicalProject) relNode)
-                            : (LogicalProject) relNode;
+            LogicalProject logicalProject = (LogicalProject) relNode;
             return logicalProject.copy(
                     logicalProject.getTraitSet(),
                     
ImmutableList.of(convert(getRel(logicalProject.getInput()), correlate)));
@@ -123,13 +109,6 @@ public class LogicalUnnestRule extends 
RelRule<LogicalUnnestRule.LogicalUnnestRu
         if (relNode instanceof Uncollect) {
             Uncollect uncollect = (Uncollect) relNode;
             RelOptCluster cluster = correlate.getCluster();
-            FlinkTypeFactory typeFactory = 
ShortcutUtils.unwrapTypeFactory(cluster);
-            RelDataType relDataType =
-                    (RelDataType)
-                            ((Map.Entry) 
uncollect.getInput().getRowType().getFieldList().get(0))
-                                    .getValue();
-            LogicalType logicalType = 
FlinkTypeFactory.toLogicalType(relDataType);
-
             BridgingSqlFunction sqlFunction =
                     BridgingSqlFunction.of(
                             cluster,
@@ -140,11 +119,7 @@ public class LogicalUnnestRule extends 
RelRule<LogicalUnnestRule.LogicalUnnestRu
             RexNode rexCall =
                     cluster.getRexBuilder()
                             .makeCall(
-                                    typeFactory.createFieldTypeFromLogicalType(
-                                            toRowType(
-                                                    
UnnestRowsFunctionBase.getUnnestedType(
-                                                            logicalType,
-                                                            
uncollect.withOrdinality))),
+                                    uncollect.getRowType(),
                                     sqlFunction,
                                     ((LogicalProject) 
getRel(uncollect.getInput())).getProjects());
             return new LogicalTableFunctionScan(
@@ -167,35 +142,6 @@ public class LogicalUnnestRule extends 
RelRule<LogicalUnnestRule.LogicalUnnestRu
         return rel;
     }
 
-    /**
-     * If unnesting type is {@code NOT NULL} however at the same time {@code 
LEFT JOIN} makes it
-     * nullable, this method adjusts nullability by inserting extra {@code 
CAST}.
-     */
-    private LogicalProject 
getLogicalProjectWithAdjustedNullability(LogicalProject logicalProject) {
-        final RelOptCluster cluster = logicalProject.getCluster();
-        FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
cluster.getTypeFactory();
-        RexBuilder rexBuilder = cluster.getRexBuilder();
-        final RelDataType rowType = logicalProject.getRowType();
-        return logicalProject.copy(
-                logicalProject.getTraitSet(),
-                logicalProject.getInput(),
-                logicalProject.getProjects().stream()
-                        .map(
-                                t -> {
-                                    if (t.getType().isNullable()) {
-                                        return t;
-                                    }
-                                    return rexBuilder.makeCast(
-                                            createNullableType(typeFactory, 
t.getType()), t);
-                                })
-                        .collect(Collectors.toList()),
-                rowType.isNullable() ? rowType : 
createNullableType(typeFactory, rowType));
-    }
-
-    private static RelDataType createNullableType(FlinkTypeFactory 
typeFactory, RelDataType type) {
-        return typeFactory.createTypeWithNullability(type, true);
-    }
-
     /** Rule configuration. */
     @Value.Immutable(singleton = false)
     public interface LogicalUnnestRuleConfig extends RelRule.Config {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
index 32325b16df2..0d6aac85430 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
@@ -39,6 +39,8 @@ public class CorrelateBatchRestoreTest extends 
BatchRestoreTestBase {
                 CorrelateTestPrograms.CORRELATE_SYSTEM_FUNC,
                 CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
                 CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
-                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST);
+                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST,
+                
CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_PRIMITIVE_ARRAY,
+                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_MAP);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
index cb1127fa457..44bf5b351b1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 
+import java.util.Map;
+
 /** {@link TableTestProgram} definitions for testing {@link 
StreamExecCorrelate}. */
 public class CorrelateTestPrograms {
 
@@ -203,6 +205,51 @@ public class CorrelateTestPrograms {
                             "INSERT INTO sink_t SELECT (SELECT name, nested 
FROM source_t, UNNEST(arr) AS T(nested)) FROM source_t")
                     .build();
 
+    public static final TableTestProgram 
CORRELATE_CROSS_JOIN_UNNEST_PRIMITIVE_ARRAY =
+            TableTestProgram.of(
+                            "correlate-cross-join-unnest-primitive-array",
+                            "validate correlate with cross join and unnest of 
primitive array")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("id INT", "vals ARRAY<INT NOT 
NULL>")
+                                    .producedBeforeRestore(
+                                            Row.of(1, new Integer[] {10, 20}),
+                                            Row.of(2, new Integer[] {30}))
+                                    .producedAfterRestore(Row.of(3, new 
Integer[] {40, 50}))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("id INT", "val INT")
+                                    .consumedBeforeRestore("+I[1, 10]", "+I[1, 
20]", "+I[2, 30]")
+                                    .consumedAfterRestore("+I[3, 40]", "+I[3, 
50]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT id, val FROM source_t 
CROSS JOIN UNNEST(vals) AS u(val)")
+                    .build();
+
+    public static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST_MAP =
+            TableTestProgram.of(
+                            "correlate-cross-join-unnest-map",
+                            "validate correlate with cross join and unnest of 
map")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("id INT", "m MAP<STRING, INT>")
+                                    .producedBeforeRestore(
+                                            Row.of(1, Map.of("a", 10, "b", 
20)),
+                                            Row.of(2, Map.of("c", 30)))
+                                    .producedAfterRestore(Row.of(3, 
Map.of("d", 40)))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("id INT", "k STRING", "v INT")
+                                    .consumedBeforeRestore(
+                                            "+I[1, a, 10]", "+I[1, b, 20]", 
"+I[2, c, 30]")
+                                    .consumedAfterRestore("+I[3, d, 40]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT id, k, v FROM source_t 
CROSS JOIN UNNEST(m) AS u(k, v)")
+                    .build();
+
     public static final TableTestProgram CORRELATE_WITH_LITERAL_AGG =
             TableTestProgram.of(
                             "correlate-with-literal-agg",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
index 4ce3c0b1bf7..38d6e468ae9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -40,6 +40,8 @@ public class CorrelateRestoreTest extends RestoreTestBase {
                 CorrelateTestPrograms.CORRELATE_JOIN_FILTER,
                 CorrelateTestPrograms.CORRELATE_LEFT_JOIN,
                 CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST,
+                
CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_PRIMITIVE_ARRAY,
+                CorrelateTestPrograms.CORRELATE_CROSS_JOIN_UNNEST_MAP,
                 CorrelateTestPrograms.CORRELATE_WITH_LITERAL_AGG);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
index 3cb0ff64a9a..b4b39fd3742 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.xml
@@ -167,8 +167,8 @@ LogicalProject(order_id=[$0], customer_id=[$1], 
product_id=[$2], ts=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[order_id, customer_id, f0 AS product_id, ts])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor2.product_ids)], 
correlate=[table($UNNEST_ROWS$1($cor2.product_ids))], 
select=[order_id,customer_id,product_id,product_ids,ts,f0], 
rowType=[RecordType(INTEGER order_id, INTEGER customer_id, INTEGER product_id, 
INTEGER ARRAY product_ids, TIMESTAMP_LTZ(3) *ROWTIME* ts, INTEGER f0)], 
joinType=[INNER])
+Calc(select=[order_id, customer_id, product_ids0 AS product_id, ts])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor2.product_ids)], 
correlate=[table($UNNEST_ROWS$1($cor2.product_ids))], 
select=[order_id,customer_id,product_id,product_ids,ts,product_ids0], 
rowType=[RecordType(INTEGER order_id, INTEGER customer_id, INTEGER product_id, 
INTEGER ARRAY product_ids, TIMESTAMP_LTZ(3) *ROWTIME* ts, INTEGER 
product_ids0)], joinType=[INNER])
    +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
       +- TableSourceScan(table=[[cat, default, t]], fields=[order_id, 
customer_id, product_id, product_ids, ts])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index c0c753fbe87..5e0da95bb07 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -33,8 +33,8 @@ LogicalProject(a=[$0], s=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c, 
VARCHAR(2147483647) c0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -56,8 +56,8 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, f1 AS v])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0,f1], 
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647), 
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1)], 
joinType=[INNER])
+Calc(select=[a, b, VALUE AS v])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,KEY,VALUE], 
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647), 
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) KEY, VARCHAR(2147483647) 
VALUE)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -117,8 +117,8 @@ LogicalProject(a=[$0], s=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,f0], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set, 
VARCHAR(2147483647) f0)], joinType=[LEFT])
+Calc(select=[a, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,set0], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set, 
VARCHAR(2147483647) set0)], joinType=[LEFT])
    +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COLLECT(set) 
AS set])
       +- Exchange(distribution=[forward])
          +- Sort(orderBy=[a ASC])
@@ -148,8 +148,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -171,9 +171,30 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinNoAliasList">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- Uncollect
+      +- LogicalProject(business_data=[$cor0.business_data])
+         +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
   </TestCase>
@@ -194,8 +215,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,data], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) data)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -217,9 +238,54 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], 
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[EXPR$0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], 
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], 
select=[business_data,nested,nested_array,EXPR$0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <> 
'debug']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[LEFT], condition=[<>($0, 
_UTF-16LE'debug')])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinWithOrdinalityOnPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, ord) ON 
v.bd_name <> 'debug']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- LogicalProject(bd_name=[$0], ord=[$1])
+         +- Uncollect(withOrdinality=[true])
+            +- LogicalProject(business_data=[$cor0.business_data])
+               +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data))], 
select=[business_data,nested,nested_array,EXPR$0,ORDINALITY], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) EXPR$0, INTEGER ORDINALITY)], joinType=[LEFT], condition= 
[...]
++- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
   </TestCase>
@@ -240,8 +306,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -263,8 +329,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,data], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) data)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -296,8 +362,8 @@ LogicalProject(b=[$0], s=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,f0], 
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)], 
joinType=[INNER])
+Calc(select=[b, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,set0], 
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT set0)], 
joinType=[INNER])
    +- SortWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, 
rowtime, 3000)], select=[b, Final_COLLECT(set) AS set])
       +- Exchange(distribution=[forward])
          +- Sort(orderBy=[b ASC, assignedWindow$ ASC])
@@ -327,8 +393,8 @@ LogicalProject(a=[$0], s=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
ARRAY f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
ARRAY c0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -432,8 +498,8 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
f0)], joinType=[INNER])
+Calc(select=[a, b, b0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,b0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
b0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -511,8 +577,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b])
 ]]>
     </Resource>
@@ -544,9 +610,9 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3], 
elem=[$4], element_pos=[
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, EXPR$0 AS array_val, ORDINALITY AS array_pos, EXPR$00 AS 
elem, ORDINALITY0 AS element_pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))], 
select=[id,nested_array,EXPR$0,ORDINALITY,EXPR$00,ORDINALITY0], 
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY 
EXPR$0, INTEGER ORDINALITY, INTEGER EXPR$00, INTEGER ORDINALITY0)], 
joinType=[INNER])
-   +- 
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))], 
select=[id,nested_array,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER id, 
INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY EXPR$0, INTEGER ORDINALITY)], 
joinType=[INNER])
+Calc(select=[id, nested_array0 AS array_val, ORDINALITY AS array_pos, 
array_val AS elem, ORDINALITY0 AS element_pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))], 
select=[id,nested_array,nested_array0,ORDINALITY,array_val,ORDINALITY0], 
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY 
nested_array0, INTEGER ORDINALITY, INTEGER array_val, INTEGER ORDINALITY0)], 
joinType=[INNER])
+   +- 
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))], 
select=[id,nested_array,nested_array0,ORDINALITY], rowType=[RecordType(INTEGER 
id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY nested_array0, INTEGER 
ORDINALITY)], joinType=[INNER])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[id, nested_array])
 ]]>
     </Resource>
@@ -645,8 +711,8 @@ LogicalProject(id=[$0], k=[$2], v=[$3], pos=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, f0 AS k, f1 AS v, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))], 
select=[id,map_data,f0,f1,ORDINALITY], rowType=[RecordType(INTEGER id, 
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647) 
f0, VARCHAR(2147483647) f1, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[id, KEY AS k, VALUE AS v, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))], 
select=[id,map_data,KEY,VALUE,ORDINALITY], rowType=[RecordType(INTEGER id, 
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647) 
KEY, VARCHAR(2147483647) VALUE, INTEGER ORDINALITY)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[id, map_data])
 ]]>
     </Resource>
@@ -674,8 +740,8 @@ LogicalProject(a=[$0], word=[$2], pos=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, EXPR$0 AS word, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))], 
select=[a,words,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) EXPR$0, INTEGER 
ORDINALITY)], joinType=[INNER])
+Calc(select=[a, words0 AS word, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))], 
select=[a,words,words0,ORDINALITY], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) words0, INTEGER 
ORDINALITY)], joinType=[INNER])
    +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_COLLECT(words) AS words])
       +- Exchange(distribution=[forward])
          +- Sort(orderBy=[a ASC])
@@ -711,8 +777,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0, 
10), <($1, 3))])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0, 10), 
<($1, 3))])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index 930a6d94bf5..0618adc8a79 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -37,7 +37,7 @@ LogicalProject(a=[$0], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalProject(s=[$0])
-      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)], 
rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -62,7 +62,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalProject(k=[$0], v=[$1])
-      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647) 
f1)])
+      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)], 
rowType=[RecordType(VARCHAR(2147483647) KEY, VARCHAR(2147483647) VALUE)])
 ]]>
     </Resource>
   </TestCase>
@@ -98,7 +98,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
       :  +- LogicalFilter(condition=[<($0, 3)])
       :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(x=[$0], y=[$1])
-         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
 ]]>
     </Resource>
   </TestCase>
@@ -132,7 +132,7 @@ LogicalProject(a=[$0], s=[$2])
       :  +- LogicalProject(a=[$0], b=[$1])
       :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(s=[$0])
-         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType(VARCHAR(2147483647) set)])
 ]]>
     </Resource>
   </TestCase>
@@ -157,7 +157,7 @@ LogicalProject(bd_name=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
    +- LogicalProject(bd_name=[$0])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType(VARCHAR(2147483647) business_data)])
 ]]>
     </Resource>
   </TestCase>
@@ -181,8 +181,31 @@ LogicalProject(bd_name=[$3])
 LogicalProject(bd_name=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
-   +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalProject(bd_name=[$0])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType(VARCHAR(2147483647) business_data)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinNoAliasList">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- Uncollect
+      +- LogicalProject(business_data=[$cor0.business_data])
+         +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType(VARCHAR(2147483647) business_data)])
 ]]>
     </Resource>
   </TestCase>
@@ -206,8 +229,8 @@ LogicalProject(bd_name=[$3])
 LogicalProject(bd_name=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{1}])
    :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
-   +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalProject(bd_name=[$0])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
rowType=[RecordType(VARCHAR(2147483647) data)])
 ]]>
     </Resource>
   </TestCase>
@@ -231,8 +254,60 @@ LogicalProject(bd_name=[$3])
 LogicalProject(bd_name=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
    :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
-   +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 
0).data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalProject(bd_name=[$0])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 
0).data)], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <> 
'debug']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType(VARCHAR(2147483647) business_data)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinWithOrdinalityOnPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, ord) ON 
v.bd_name <> 'debug']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- LogicalProject(bd_name=[$0], ord=[$1])
+         +- Uncollect(withOrdinality=[true])
+            +- LogicalProject(business_data=[$cor0.business_data])
+               +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- LogicalProject(bd_name=[$0], ord=[$1])
+         +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data)],
 rowType=[RecordType(VARCHAR(2147483647) business_data, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -257,7 +332,7 @@ LogicalProject(bd_name=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
    +- LogicalProject(bd_name=[$0])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType(VARCHAR(2147483647) business_data)])
 ]]>
     </Resource>
   </TestCase>
@@ -282,7 +357,7 @@ LogicalProject(bd_name=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
    :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
    +- LogicalProject(bd_name=[$0])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
rowType=[RecordType(VARCHAR(2147483647) data)])
 ]]>
     </Resource>
   </TestCase>
@@ -321,7 +396,7 @@ LogicalProject(b=[$0], s=[$2])
       :     +- LogicalProject(b=[$1], $f1=[$TUMBLE($3, 3000:INTERVAL SECOND)])
       :        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(s=[$0])
-         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType:peek_no_expand(BIGINT f0)])
+         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType(BIGINT set)])
 ]]>
     </Resource>
   </TestCase>
@@ -346,7 +421,7 @@ LogicalProject(a=[$0], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalProject(s=[$0])
-      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)], 
rowType=[RecordType:peek_no_expand(INTEGER ARRAY f0)])
+      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.c)], 
rowType=[RecordType(INTEGER ARRAY c)])
 ]]>
     </Resource>
   </TestCase>
@@ -380,7 +455,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
       :  +- LogicalProject(b=[$1], c=[$2])
       :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(id=[$0], point=[$1])
-         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
 ]]>
     </Resource>
   </TestCase>
@@ -407,7 +482,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalProject(s=[$0], t=[$1])
-         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+         +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
 ]]>
     </Resource>
   </TestCase>
@@ -432,7 +507,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
 +- LogicalFilter(condition=[>($2, 1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
 ]]>
     </Resource>
   </TestCase>
@@ -457,7 +532,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalProject(s=[$0])
-      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER f0)])
+      +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType(INTEGER b)])
 ]]>
     </Resource>
   </TestCase>
@@ -499,7 +574,7 @@ LogicalProject(a=[$0], b1=[$1], b2=[$2])
             :- LogicalProject(a=[$0], b=[$1])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
             +- LogicalProject(b1=[$0], b2=[$1])
-               +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2)])
+               +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.b)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2)])
 ]]>
     </Resource>
   </TestCase>
@@ -547,7 +622,7 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalProject(number=[$0], ordinality=[$1])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType(INTEGER b, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -583,9 +658,9 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3], 
elem=[$4], element_pos=[
    :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
    :  :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    :  +- LogicalProject(array_val=[$0], array_pos=[$1])
-   :     +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
 rowType=[RecordType:peek_no_expand(INTEGER ARRAY EXPR$0, INTEGER ORDINALITY)])
+   :     +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)],
 rowType=[RecordType(INTEGER ARRAY nested_array, INTEGER ORDINALITY)])
    +- LogicalProject(elem=[$0], element_pos=[$1])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
 rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)],
 rowType=[RecordType(INTEGER array_val, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -612,7 +687,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3], o=[$4])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalProject(s=[$0], t=[$1], o=[$2])
-         +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2, INTEGER 
ORDINALITY)])
+         +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -637,7 +712,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3], 
ORDINALITY=[$4])
 +- LogicalFilter(condition=[>($2, 1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER _1, VARCHAR(2147483647) _2, INTEGER 
ORDINALITY)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType(INTEGER _1, VARCHAR(2147483647) _2, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -673,7 +748,7 @@ LogicalProject(a=[$0], number=[$1], ordinality=[$2])
       +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
          :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
          +- LogicalProject(number=[$0], ordinality=[$1])
-            +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+            +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType(INTEGER b, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -698,7 +773,7 @@ LogicalProject(id=[$0], k=[$2], v=[$3], pos=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalProject(k=[$0], v=[$1], pos=[$2])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
 rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647) 
f1, INTEGER ORDINALITY)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)],
 rowType=[RecordType(VARCHAR(2147483647) KEY, VARCHAR(2147483647) VALUE, 
INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -731,7 +806,7 @@ LogicalProject(a=[$0], word=[$2], pos=[$3])
    :  +- LogicalProject(a=[$0], c=[$2])
    :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
    +- LogicalProject(word=[$0], pos=[$1])
-      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
 rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) EXPR$0, INTEGER 
ORDINALITY)])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)],
 rowType=[RecordType(VARCHAR(2147483647) words, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
@@ -763,7 +838,7 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalProject(number=[$0], ordinality=[$1])
-         +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType:peek_no_expand(INTEGER EXPR$0, INTEGER ORDINALITY)])
+         +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
rowType=[RecordType(INTEGER b, INTEGER ORDINALITY)])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 3a5e888ca25..1372c8ec0a7 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -659,8 +659,8 @@ 
LogicalSink(table=[default_catalog.default_database.UnnestSink], targetColumns=[
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.UnnestSink], 
targetColumns=[[0],[1],[2],[3],[4]], fields=[detail_id, element_data, 
data_value_id, user_id, order_id])
-+- Calc(select=[detail_id, TRIM(BOTH, ' ', REGEXP_REPLACE(f0, '[\[\]\"]', '')) 
AS element_data, ARRAY_POSITION(split(REGEXP_REPLACE(data, '^\["|"\]$', ''), 
'", "'), f0) AS data_value_id, user_id, order_id])
-   +- Correlate(invocation=[$UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data, 
_UTF-16LE'^\["|"\]$', _UTF-16LE''), _UTF-16LE'", "'))], 
correlate=[table($UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data, '^\["|"\]$', 
''), '", "')))], 
select=[detail_id,description,user_id,data,timestamp,order_id,user_id0,product,payment_id,price,user_id1,location,user_id2,f0],
 rowType=[RecordType(VARCHAR(2147483647) detail_id, VARCHAR(2147483647) 
description, VARCHAR(2147483647) user_id, VARCHAR(2147483647) data [...]
++- Calc(select=[detail_id, TRIM(BOTH, ' ', REGEXP_REPLACE(EXPR$0, '[\[\]\"]', 
'')) AS element_data, ARRAY_POSITION(split(REGEXP_REPLACE(data, '^\["|"\]$', 
''), '", "'), EXPR$0) AS data_value_id, user_id, order_id])
+   +- Correlate(invocation=[$UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data, 
_UTF-16LE'^\["|"\]$', _UTF-16LE''), _UTF-16LE'", "'))], 
correlate=[table($UNNEST_ROWS$1(split(REGEXP_REPLACE($cor0.data, '^\["|"\]$', 
''), '", "')))], 
select=[detail_id,description,user_id,data,timestamp,order_id,user_id0,product,payment_id,price,user_id1,location,user_id2,EXPR$0],
 rowType=[RecordType(VARCHAR(2147483647) detail_id, VARCHAR(2147483647) 
description, VARCHAR(2147483647) user_id, VARCHAR(2147483647)  [...]
       +- Calc(select=[detail_id, description, user_id, data, timestamp, 
order_id, user_id0, product, payment_id, price, user_id1, location, user_id2], 
where=[IS NULL(location)])
          +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, LEFT], 
inputUniqueKeys=[(detail_id), (order_id), (payment_id), noUniqueKey], 
joinConditions=[=(user_id0, user_id), =(user_id1, user_id), =(user_id2, 
user_id)], 
select=[detail_id,description,user_id,data,timestamp,order_id,user_id0,product,payment_id,price,user_id1,location,user_id2],
 rowType=[RecordType(VARCHAR(2147483647) detail_id, VARCHAR(2147483647) 
description, VARCHAR(2147483647) user_id, VARCHAR(2147483647) da [...]
             :- Exchange(distribution=[hash[user_id]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index b925b5ce5c7..138d56d8c12 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -33,8 +33,8 @@ LogicalProject(a=[$0], s=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) ARRAY c, 
VARCHAR(2147483647) c0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -56,8 +56,8 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, f1 AS v])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0,f1], 
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647), 
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1)], 
joinType=[INNER])
+Calc(select=[a, b, VALUE AS v])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,KEY,VALUE], 
rowType=[RecordType(INTEGER a, BIGINT b, (VARCHAR(2147483647), 
VARCHAR(2147483647)) MAP c, VARCHAR(2147483647) KEY, VARCHAR(2147483647) 
VALUE)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -117,8 +117,8 @@ LogicalProject(a=[$0], s=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,f0], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set, 
VARCHAR(2147483647) f0)], joinType=[LEFT])
+Calc(select=[a, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[a,set,set0], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) MULTISET set, 
VARCHAR(2147483647) set0)], joinType=[LEFT])
    +- GroupAggregate(groupBy=[a], select=[a, COLLECT(b) AS set])
       +- Exchange(distribution=[hash[a]])
          +- Calc(select=[a, b], where=[(a < 5)])
@@ -143,8 +143,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -166,9 +166,30 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinNoAliasList">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- Uncollect
+      +- LogicalProject(business_data=[$cor0.business_data])
+         +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
   </TestCase>
@@ -189,8 +210,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,data], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) data)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -212,9 +233,54 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], 
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+Calc(select=[EXPR$0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], 
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], 
select=[business_data,nested,nested_array,EXPR$0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <> 
'debug']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
exploded_bd=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[LEFT], condition=[<>($0, 
_UTF-16LE'debug')])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinWithOrdinalityOnPredicate">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, ord) ON 
v.bd_name <> 'debug']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(business_data=[$0], nested=[$1], nested_array=[$2], 
bd_name=[$3], ord=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalFilter(condition=[<>($0, _UTF-16LE'debug')])
+      +- LogicalProject(bd_name=[$0], ord=[$1])
+         +- Uncollect(withOrdinality=[true])
+            +- LogicalProject(business_data=[$cor0.business_data])
+               +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.business_data))], 
select=[business_data,nested,nested_array,EXPR$0,ORDINALITY], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) EXPR$0, INTEGER ORDINALITY)], joinType=[LEFT], condition= 
[...]
++- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
   </TestCase>
@@ -235,8 +301,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[business_data0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,business_data0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) business_data0)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -258,8 +324,8 @@ LogicalProject(bd_name=[$3])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[f0 AS bd_name])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[data AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,data], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) data)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
@@ -291,8 +357,8 @@ LogicalProject(b=[$0], s=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,f0], 
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT f0)], 
joinType=[INNER])
+Calc(select=[b, set0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.set)], 
correlate=[table($UNNEST_ROWS$1($cor0.set))], select=[b,set,set0], 
rowType=[RecordType(BIGINT b, BIGINT MULTISET set, BIGINT set0)], 
joinType=[INNER])
    +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, 
rowtime, 3000)], select=[b, COLLECT(b) AS set])
       +- Exchange(distribution=[hash[b]])
          +- Calc(select=[b, rowtime], where=[(b < 3)])
@@ -317,8 +383,8 @@ LogicalProject(a=[$0], s=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
ARRAY f0)], joinType=[INNER])
+Calc(select=[a, c0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.c)], 
correlate=[table($UNNEST_ROWS$1($cor0.c))], select=[a,b,c,c0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
ARRAY c0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -417,8 +483,8 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, f0 AS s])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
f0)], joinType=[INNER])
+Calc(select=[a, b, b0 AS s])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,c,b0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER ARRAY ARRAY c, INTEGER 
b0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
 ]]>
     </Resource>
@@ -445,7 +511,7 @@ LogicalProject(a=[$0])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a])
-+- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,f0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER f0)], joinType=[INNER])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.b)], 
correlate=[table($UNNEST_ROWS$1($cor0.b))], select=[a,b,b0], 
rowType=[RecordType(INTEGER a, INTEGER ARRAY b, INTEGER b0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], 
fields=[a, b])
 ]]>
     </Resource>
@@ -523,8 +589,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b])
 ]]>
     </Resource>
@@ -556,9 +622,9 @@ LogicalProject(id=[$0], array_val=[$2], array_pos=[$3], 
elem=[$4], element_pos=[
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, EXPR$0 AS array_val, ORDINALITY AS array_pos, EXPR$00 AS 
elem, ORDINALITY0 AS element_pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))], 
select=[id,nested_array,EXPR$0,ORDINALITY,EXPR$00,ORDINALITY0], 
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY 
EXPR$0, INTEGER ORDINALITY, INTEGER EXPR$00, INTEGER ORDINALITY0)], 
joinType=[INNER])
-   +- 
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))], 
select=[id,nested_array,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER id, 
INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY EXPR$0, INTEGER ORDINALITY)], 
joinType=[INNER])
+Calc(select=[id, nested_array0 AS array_val, ORDINALITY AS array_pos, 
array_val AS elem, ORDINALITY0 AS element_pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor1.array_val))], 
select=[id,nested_array,nested_array0,ORDINALITY,array_val,ORDINALITY0], 
rowType=[RecordType(INTEGER id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY 
nested_array0, INTEGER ORDINALITY, INTEGER array_val, INTEGER ORDINALITY0)], 
joinType=[INNER])
+   +- 
Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.nested_array))], 
select=[id,nested_array,nested_array0,ORDINALITY], rowType=[RecordType(INTEGER 
id, INTEGER ARRAY ARRAY nested_array, INTEGER ARRAY nested_array0, INTEGER 
ORDINALITY)], joinType=[INNER])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[id, nested_array])
 ]]>
     </Resource>
@@ -657,8 +723,8 @@ LogicalProject(id=[$0], k=[$2], v=[$3], pos=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[id, f0 AS k, f1 AS v, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))], 
select=[id,map_data,f0,f1,ORDINALITY], rowType=[RecordType(INTEGER id, 
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647) 
f0, VARCHAR(2147483647) f1, INTEGER ORDINALITY)], joinType=[INNER])
+Calc(select=[id, KEY AS k, VALUE AS v, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.map_data))], 
select=[id,map_data,KEY,VALUE,ORDINALITY], rowType=[RecordType(INTEGER id, 
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP map_data, VARCHAR(2147483647) 
KEY, VARCHAR(2147483647) VALUE, INTEGER ORDINALITY)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[id, map_data])
 ]]>
     </Resource>
@@ -686,8 +752,8 @@ LogicalProject(a=[$0], word=[$2], pos=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, EXPR$0 AS word, ORDINALITY AS pos])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))], 
select=[a,words,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) EXPR$0, INTEGER 
ORDINALITY)], joinType=[INNER])
+Calc(select=[a, words0 AS word, ORDINALITY AS pos])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.words))], 
select=[a,words,words0,ORDINALITY], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) MULTISET words, VARCHAR(2147483647) words0, INTEGER 
ORDINALITY)], joinType=[INNER])
    +- GroupAggregate(groupBy=[a], select=[a, COLLECT(c) AS words])
       +- Exchange(distribution=[hash[a]])
          +- Calc(select=[a, c])
@@ -718,8 +784,8 @@ LogicalProject(a=[$0], number=[$2], ordinality=[$3])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, EXPR$0 AS number, ORDINALITY AS ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,EXPR$0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER EXPR$0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0, 
10), <($1, 3))])
+Calc(select=[a, b0 AS number, ORDINALITY AS ordinality])
++- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)], 
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))], 
select=[a,b,b0,ORDINALITY], rowType=[RecordType(INTEGER a, INTEGER ARRAY b, 
INTEGER b0, INTEGER ORDINALITY)], joinType=[INNER], condition=[AND(>($0, 10), 
<($1, 3))])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
new file mode 100644
index 00000000000..313b3d3bf07
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
@@ -0,0 +1,137 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 5,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "m",
+              "dataType" : "MAP<VARCHAR(2147483647), INT>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[id, m])",
+    "dynamicFilteringDataListenerID" : "6757d5d5-a915-4774-88a6-40cb2d2fb0ba"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$UNNEST_ROWS$1",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "m",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `k` 
VARCHAR(2147483647), `v` INT> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`f0` VARCHAR(2147483647), `f1` INT> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `f0` 
VARCHAR(2147483647), `f1` INT>",
+    "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.m)], 
correlate=[table($UNNEST_ROWS$1($cor0.m))], select=[id,m,f0,f1], 
rowType=[RecordType(INTEGER id, (VARCHAR(2147483647), INTEGER) MAP m, 
VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER])"
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+    "description" : "Calc(select=[id, f0 AS k, f1 AS v])"
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "k",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "v",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, k, v])"
+  } ],
+  "edges" : [ {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
new file mode 100644
index 00000000000..a1f1fa6ca46
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
@@ -0,0 +1,130 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "vals",
+              "dataType" : "ARRAY<INT NOT NULL>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[id, vals])",
+    "dynamicFilteringDataListenerID" : "a0d0a3b5-53f1-46b7-ae4d-3b4fe7d9145d"
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$UNNEST_ROWS$1",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "vals",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `val` INT NOT 
NULL> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`f0` INT NOT NULL>"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `f0` INT NOT 
NULL>",
+    "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.vals)], 
correlate=[table($UNNEST_ROWS$1($cor0.vals))], select=[id,vals,f0], 
rowType=[RecordType(INTEGER id, INTEGER ARRAY vals, INTEGER f0)], 
joinType=[INNER])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+    "description" : "Calc(select=[id, f0 AS val])"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "val",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, val])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
new file mode 100644
index 00000000000..a80c3aa0bb6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/plan/correlate-cross-join-unnest-map.json
@@ -0,0 +1,141 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 26,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "m",
+              "dataType" : "MAP<VARCHAR(2147483647), INT>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[id, m])"
+  }, {
+    "id" : 27,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$UNNEST_ROWS$1",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "m",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `k` 
VARCHAR(2147483647), `v` INT> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`f0` VARCHAR(2147483647), `f1` INT> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `m` MAP<VARCHAR(2147483647), INT>, `f0` 
VARCHAR(2147483647), `f1` INT>",
+    "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.m)], 
correlate=[table($UNNEST_ROWS$1($cor0.m))], select=[id,m,f0,f1], 
rowType=[RecordType(INTEGER id, (VARCHAR(2147483647), INTEGER) MAP m, 
VARCHAR(2147483647) f0, INTEGER f1)], joinType=[INNER])"
+  }, {
+    "id" : 28,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+    "description" : "Calc(select=[id, f0 AS k, f1 AS v])"
+  }, {
+    "id" : 29,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "k",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "v",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "VALUE",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `k` VARCHAR(2147483647), `v` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, k, v])"
+  } ],
+  "edges" : [ {
+    "source" : 26,
+    "target" : 27,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 27,
+    "target" : 28,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 28,
+    "target" : 29,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/savepoint/_metadata
new file mode 100644
index 00000000000..d64328abf13
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-map/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
new file mode 100644
index 00000000000..dc72585856a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/plan/correlate-cross-join-unnest-primitive-array.json
@@ -0,0 +1,133 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 22,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "vals",
+              "dataType" : "ARRAY<INT NOT NULL>"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[id, vals])"
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$UNNEST_ROWS$1",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "vals",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `val` INT NOT 
NULL> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`f0` INT NOT NULL>"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `vals` ARRAY<INT NOT NULL>, `f0` INT NOT 
NULL>",
+    "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.vals)], 
correlate=[table($UNNEST_ROWS$1($cor0.vals))], select=[id,vals,f0], 
rowType=[RecordType(INTEGER id, INTEGER ARRAY vals, INTEGER f0)], 
joinType=[INNER])"
+  }, {
+    "id" : 24,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+    "description" : "Calc(select=[id, f0 AS val])"
+  }, {
+    "id" : 25,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "val",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `val` INT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[id, val])"
+  } ],
+  "edges" : [ {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 23,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 24,
+    "target" : 25,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/savepoint/_metadata
new file mode 100644
index 00000000000..af368bb1100
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-correlate_1/correlate-cross-join-unnest-primitive-array/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
index d1ee04db109..2634b0dc301 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
@@ -282,6 +282,26 @@ abstract class UnnestTestBase(withExecPlan: Boolean) 
extends TableTestBase {
       "SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE")
   }
 
+  @Test
+  def testNullMismatchLeftJoinNoAliasList(): Unit = {
+    util.verifyRelPlan(
+      "SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON TRUE")
+  }
+
+  @Test
+  def testNullMismatchLeftJoinOnPredicate(): Unit = {
+    util.verifyRelPlan(
+      "SELECT * FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd ON exploded_bd <> 'debug'")
+  }
+
+  @Test
+  def testNullMismatchLeftJoinWithOrdinalityOnPredicate(): Unit = {
+    util.verifyRelPlan(
+      "SELECT * FROM nested_not_null LEFT JOIN " +
+        "UNNEST(nested_not_null.business_data) WITH ORDINALITY AS v(bd_name, 
ord) " +
+        "ON v.bd_name <> 'debug'")
+  }
+
   def verifyPlan(sql: String): Unit = {
     if (withExecPlan) {
       util.verifyExecPlan(sql)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
index 669c9962487..09587cc5f61 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala
@@ -587,4 +587,42 @@ class UnnestITCase extends BatchTestBase {
       Seq(row(1, 12, "45.6", 1), row(2, 13, "41.6", 1))
     )
   }
+
+  @Test
+  def testLeftJoinUnnestNoAliasList(): Unit = {
+    val data = List(
+      row(1, Array(10, 20)),
+      row(2, Array.empty[Int]),
+      row(3, Array(30))
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, Types.PRIMITIVE_ARRAY(Types.INT)),
+      "id, vals")
+
+    checkResult(
+      "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS exploded_val 
ON TRUE",
+      Seq(row(1, 10), row(1, 20), row(2, null), row(3, 30))
+    )
+  }
+
+  @Test
+  def testLeftJoinUnnestOnPredicate(): Unit = {
+    val data = List(
+      row(1, Array(1, 50, 100)),
+      row(2, Array(1, 2)),
+      row(3, Array.empty[Int])
+    )
+    registerCollection(
+      "T",
+      data,
+      new RowTypeInfo(Types.INT, Types.PRIMITIVE_ARRAY(Types.INT)),
+      "id, vals")
+
+    checkResult(
+      "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS exploded_val 
ON exploded_val > 10",
+      Seq(row(1, 50), row(1, 100), row(2, null), row(3, null))
+    )
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
index ec8e29e24c0..17f0093ec39 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/UnnestITCase.scala
@@ -782,4 +782,41 @@ class UnnestITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mo
     val expectedFieldNames = List("x", "y", "ord")
     assertThat(fieldNames).isEqualTo(expectedFieldNames)
   }
+
+  @TestTemplate
+  def testLeftJoinUnnestNoAliasList(): Unit = {
+    val data = List(
+      (1, Array(10, 20)),
+      (2, Array.empty[Int]),
+      (3, Array(30))
+    )
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Array[Int])],
+      sqlQuery = "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS 
exploded_val ON TRUE",
+      expectedResults = List("1,10", "1,20", "2,null", "3,30"),
+      isRetract = false,
+      fieldNames = 'id,
+      'vals
+    )
+  }
+
+  @TestTemplate
+  def testLeftJoinUnnestOnPredicate(): Unit = {
+    val data = List(
+      (1, Array(1, 50, 100)),
+      (2, Array(1, 2)),
+      (3, Array.empty[Int])
+    )
+    assertUnnest(
+      testData = data,
+      typeInfo = createTypeInformation[(Int, Array[Int])],
+      sqlQuery =
+        "SELECT id, exploded_val FROM T LEFT JOIN UNNEST(T.vals) AS 
exploded_val ON exploded_val > 10",
+      expectedResults = List("1,50", "1,100", "2,null", "3,null"),
+      isRetract = false,
+      fieldNames = 'id,
+      'vals
+    )
+  }
 }

Reply via email to