This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 019a9f02d0a [FLINK-33217][table] `UNNEST` fails with on `LEFT JOIN` 
with `NOT NULL` type in array
019a9f02d0a is described below

commit 019a9f02d0a19bb1e7d80845e48d104ac60d2eb3
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Nov 10 18:17:37 2025 +0100

    [FLINK-33217][table] `UNNEST` fails with on `LEFT JOIN` with `NOT NULL` 
type in array
---
 .../plan/rules/logical/LogicalUnnestRule.java      |  37 ++++-
 .../batch/BatchPhysicalCorrelateRule.scala         |   4 +-
 .../table/planner/plan/batch/sql/UnnestTest.xml    | 138 +++++++++++++++++++
 .../plan/rules/logical/LogicalUnnestRuleTest.xml   | 150 +++++++++++++++++++++
 .../table/planner/plan/stream/sql/UnnestTest.xml   | 138 +++++++++++++++++++
 .../table/planner/plan/common/UnnestTestBase.scala |  52 ++++++-
 6 files changed, 516 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
index b9eb4f24f11..a0805e4b110 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
@@ -33,17 +33,20 @@ import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.immutables.value.Value;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toRowType;
 
@@ -103,7 +106,10 @@ public class LogicalUnnestRule extends 
RelRule<LogicalUnnestRule.LogicalUnnestRu
             relNode = convert(getRel(hepRelVertex), correlate);
         }
         if (relNode instanceof LogicalProject) {
-            LogicalProject logicalProject = (LogicalProject) relNode;
+            final LogicalProject logicalProject =
+                    correlate.getJoinType() == JoinRelType.LEFT
+                            ? 
getLogicalProjectWithAdjustedNullability((LogicalProject) relNode)
+                            : (LogicalProject) relNode;
             return logicalProject.copy(
                     logicalProject.getTraitSet(),
                     
ImmutableList.of(convert(getRel(logicalProject.getInput()), correlate)));
@@ -161,6 +167,35 @@ public class LogicalUnnestRule extends 
RelRule<LogicalUnnestRule.LogicalUnnestRu
         return rel;
     }
 
+    /**
+     * If unnesting type is {@code NOT NULL} however at the same time {@code 
LEFT JOIN} makes it
+     * nullable, this method adjusts nullability by inserting extra {@code 
CAST}.
+     */
+    private LogicalProject 
getLogicalProjectWithAdjustedNullability(LogicalProject logicalProject) {
+        final RelOptCluster cluster = logicalProject.getCluster();
+        FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
cluster.getTypeFactory();
+        RexBuilder rexBuilder = cluster.getRexBuilder();
+        final RelDataType rowType = logicalProject.getRowType();
+        return logicalProject.copy(
+                logicalProject.getTraitSet(),
+                logicalProject.getInput(),
+                logicalProject.getProjects().stream()
+                        .map(
+                                t -> {
+                                    if (t.getType().isNullable()) {
+                                        return t;
+                                    }
+                                    return rexBuilder.makeCast(
+                                            createNullableType(typeFactory, 
t.getType()), t);
+                                })
+                        .collect(Collectors.toList()),
+                rowType.isNullable() ? rowType : 
createNullableType(typeFactory, rowType));
+    }
+
+    private static RelDataType createNullableType(FlinkTypeFactory 
typeFactory, RelDataType type) {
+        return typeFactory.createTypeWithNullability(type, true);
+    }
+
     /** Rule configuration. */
     @Value.Immutable(singleton = false)
     public interface LogicalUnnestRuleConfig extends RelRule.Config {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
index 9f714f2057c..471f40cce18 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
@@ -64,7 +64,9 @@ class BatchPhysicalCorrelateRule(config: Config) extends 
ConverterRule(config) {
         case calc: FlinkLogicalCalc =>
           convertToCorrelate(
             calc.getInput.asInstanceOf[RelSubset].getOriginal,
-            Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)))
+            if (calc.getProgram.getCondition == null) None
+            else 
Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))
+          )
 
         case scan: FlinkLogicalTableFunctionScan =>
           new BatchPhysicalCorrelate(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index 2b8cfaaffc1..c0c753fbe87 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -128,6 +128,144 @@ Calc(select=[a, f0 AS s])
                      +- Sort(orderBy=[a ASC])
                         +- Calc(select=[a, b], where=[(a < 5)])
                            +- BoundedStreamScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchCrossJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(data=[$cor0.nested.data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnNestedArray">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], 
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchNaturalJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchNaturalJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(data=[$cor0.nested.data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index 6c744e43ca0..930a6d94bf5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -133,6 +133,156 @@ LogicalProject(a=[$0], s=[$2])
       :     +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(s=[$0])
          +- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchCrossJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(data=[$cor0.nested.data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnNestedArray">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 
0).data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchNaturalJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchNaturalJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(data=[$cor0.nested.data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- 
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index c33bc89966d..b925b5ce5c7 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -123,6 +123,144 @@ Calc(select=[a, f0 AS s])
       +- Exchange(distribution=[hash[a]])
          +- Calc(select=[a, b], where=[(a < 5)])
             +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchCrossJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(data=[$cor0.nested.data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchLeftJoinOnNestedArray">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{2}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)], 
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))], 
select=[business_data,nested,nested_array,bd_name], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchNaturalJoin">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(business_data=[$cor0.business_data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)], 
correlate=[table($UNNEST_ROWS$1($cor0.business_data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNullMismatchNaturalJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{1}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]])
+   +- LogicalProject(bd_name=[$0])
+      +- Uncollect
+         +- LogicalProject(data=[$cor0.nested.data])
+            +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)], 
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))], 
select=[business_data,nested,nested_array,f0], 
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested, 
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array, 
VARCHAR(2147483647) f0)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
nested_not_null, source: [CollectionTableSource(business_data, nested, 
nested_array)]]], fields=[business_data, nested, nested_array])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
index f0dabc8eeac..d1ee04db109 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
 import org.apache.flink.table.types.AbstractDataType
 
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{BeforeEach, Test}
 
 import java.sql.Timestamp
 
@@ -32,6 +32,20 @@ abstract class UnnestTestBase(withExecPlan: Boolean) extends 
TableTestBase {
 
   protected def getTableTestUtil: TableTestUtil
 
+  @BeforeEach
+  def setupBeforeEach(): Unit = {
+    util.addTable("""
+                    |CREATE TABLE nested_not_null (
+                    | business_data ARRAY<STRING NOT NULL>,
+                    | nested ROW<`data` ARRAY<STRING NOT NULL>>,
+                    | nested_array ARRAY<ROW<`data` ARRAY<STRING NOT NULL>> 
NOT NULL>
+                    |) WITH (
+                    | 'connector' = 'COLLECTION',
+                    | 'is-bounded' = 'true'
+                    |)
+      """.stripMargin)
+  }
+
   @Test
   def testUnnestPrimitiveArrayFromTable(): Unit = {
     util.addTableSource[(Int, Array[Int], Array[Array[Int]])]("MyTable", 'a, 
'b, 'c)
@@ -232,6 +246,42 @@ abstract class UnnestTestBase(withExecPlan: Boolean) 
extends TableTestBase {
                  |""".stripMargin)
   }
 
+  @Test
+  def testNullMismatchLeftJoin(): Unit = {
+    util.verifyRelPlan(
+      "SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE")
+  }
+
+  @Test
+  def testNullMismatchCrossJoin(): Unit = {
+    util.verifyRelPlan(
+      "SELECT bd_name FROM nested_not_null CROSS JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)")
+  }
+
+  @Test
+  def testNullMismatchNaturalJoin(): Unit = {
+    util.verifyRelPlan(
+      "SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)")
+  }
+
+  @Test
+  def testNullMismatchNaturalJoinOnNested(): Unit = {
+    util.verifyRelPlan(
+      "SELECT bd_name FROM nested_not_null NATURAL JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)")
+  }
+
+  @Test
+  def testNullMismatchLeftJoinOnNested(): Unit = {
+    util.verifyRelPlan(
+      "SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE")
+  }
+
+  @Test
+  def testNullMismatchLeftJoinOnNestedArray(): Unit = {
+    util.verifyRelPlan(
+      "SELECT bd_name FROM nested_not_null LEFT JOIN 
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE")
+  }
+
   def verifyPlan(sql: String): Unit = {
     if (withExecPlan) {
       util.verifyExecPlan(sql)


Reply via email to