This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 019a9f02d0a [FLINK-33217][table] `UNNEST` fails with on `LEFT JOIN`
with `NOT NULL` type in array
019a9f02d0a is described below
commit 019a9f02d0a19bb1e7d80845e48d104ac60d2eb3
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Nov 10 18:17:37 2025 +0100
[FLINK-33217][table] `UNNEST` fails with on `LEFT JOIN` with `NOT NULL`
type in array
---
.../plan/rules/logical/LogicalUnnestRule.java | 37 ++++-
.../batch/BatchPhysicalCorrelateRule.scala | 4 +-
.../table/planner/plan/batch/sql/UnnestTest.xml | 138 +++++++++++++++++++
.../plan/rules/logical/LogicalUnnestRuleTest.xml | 150 +++++++++++++++++++++
.../table/planner/plan/stream/sql/UnnestTest.xml | 138 +++++++++++++++++++
.../table/planner/plan/common/UnnestTestBase.scala | 52 ++++++-
6 files changed, 516 insertions(+), 3 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
index b9eb4f24f11..a0805e4b110 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.java
@@ -33,17 +33,20 @@ import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.immutables.value.Value;
import java.util.Collections;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toRowType;
@@ -103,7 +106,10 @@ public class LogicalUnnestRule extends
RelRule<LogicalUnnestRule.LogicalUnnestRu
relNode = convert(getRel(hepRelVertex), correlate);
}
if (relNode instanceof LogicalProject) {
- LogicalProject logicalProject = (LogicalProject) relNode;
+ final LogicalProject logicalProject =
+ correlate.getJoinType() == JoinRelType.LEFT
+ ?
getLogicalProjectWithAdjustedNullability((LogicalProject) relNode)
+ : (LogicalProject) relNode;
return logicalProject.copy(
logicalProject.getTraitSet(),
ImmutableList.of(convert(getRel(logicalProject.getInput()), correlate)));
@@ -161,6 +167,35 @@ public class LogicalUnnestRule extends
RelRule<LogicalUnnestRule.LogicalUnnestRu
return rel;
}
+ /**
+ * If unnesting type is {@code NOT NULL} however at the same time {@code
LEFT JOIN} makes it
+ * nullable, this method adjusts nullability by inserting extra {@code
CAST}.
+ */
+ private LogicalProject
getLogicalProjectWithAdjustedNullability(LogicalProject logicalProject) {
+ final RelOptCluster cluster = logicalProject.getCluster();
+ FlinkTypeFactory typeFactory = (FlinkTypeFactory)
cluster.getTypeFactory();
+ RexBuilder rexBuilder = cluster.getRexBuilder();
+ final RelDataType rowType = logicalProject.getRowType();
+ return logicalProject.copy(
+ logicalProject.getTraitSet(),
+ logicalProject.getInput(),
+ logicalProject.getProjects().stream()
+ .map(
+ t -> {
+ if (t.getType().isNullable()) {
+ return t;
+ }
+ return rexBuilder.makeCast(
+ createNullableType(typeFactory,
t.getType()), t);
+ })
+ .collect(Collectors.toList()),
+ rowType.isNullable() ? rowType :
createNullableType(typeFactory, rowType));
+ }
+
+ private static RelDataType createNullableType(FlinkTypeFactory
typeFactory, RelDataType type) {
+ return typeFactory.createTypeWithNullability(type, true);
+ }
+
/** Rule configuration. */
@Value.Immutable(singleton = false)
public interface LogicalUnnestRuleConfig extends RelRule.Config {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
index 9f714f2057c..471f40cce18 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
@@ -64,7 +64,9 @@ class BatchPhysicalCorrelateRule(config: Config) extends
ConverterRule(config) {
case calc: FlinkLogicalCalc =>
convertToCorrelate(
calc.getInput.asInstanceOf[RelSubset].getOriginal,
- Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)))
+ if (calc.getProgram.getCondition == null) None
+ else
Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))
+ )
case scan: FlinkLogicalTableFunctionScan =>
new BatchPhysicalCorrelate(
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index 2b8cfaaffc1..c0c753fbe87 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -128,6 +128,144 @@ Calc(select=[a, f0 AS s])
+- Sort(orderBy=[a ASC])
+- Calc(select=[a, b], where=[(a < 5)])
+- BoundedStreamScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchCrossJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnNested">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(data=[$cor0.nested.data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnNestedArray">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)],
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchNaturalJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchNaturalJoinOnNested">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(data=[$cor0.nested.data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index 6c744e43ca0..930a6d94bf5 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -133,6 +133,156 @@ LogicalProject(a=[$0], s=[$2])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(s=[$0])
+- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.set)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchCrossJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnNested">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(data=[$cor0.nested.data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnNestedArray">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array,
0).data)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchNaturalJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchNaturalJoinOnNested">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(data=[$cor0.nested.data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +-
LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index c33bc89966d..b925b5ce5c7 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -123,6 +123,144 @@ Calc(select=[a, f0 AS s])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b], where=[(a < 5)])
+- DataStreamScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchCrossJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null CROSS JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnNested">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(data=[$cor0.nested.data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchLeftJoinOnNestedArray">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(EXPR$0=[ITEM($cor0.nested_array, 0).data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data)],
correlate=[table($UNNEST_ROWS$1(ITEM($cor0.nested_array, 0).data))],
select=[business_data,nested,nested_array,bd_name],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) bd_name)], joinType=[LEFT])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchNaturalJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(business_data=[$cor0.business_data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.business_data)],
correlate=[table($UNNEST_ROWS$1($cor0.business_data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNullMismatchNaturalJoinOnNested">
+ <Resource name="sql">
+ <![CDATA[SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(bd_name=[$3])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{1}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]])
+ +- LogicalProject(bd_name=[$0])
+ +- Uncollect
+ +- LogicalProject(data=[$cor0.nested.data])
+ +- LogicalValues(tuples=[[{ 0 }]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0 AS bd_name])
++- Correlate(invocation=[$UNNEST_ROWS$1($cor0.nested.data)],
correlate=[table($UNNEST_ROWS$1($cor0.nested.data))],
select=[business_data,nested,nested_array,f0],
rowType=[RecordType(VARCHAR(2147483647) ARRAY business_data,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) nested,
RecordType:peek_no_expand(VARCHAR(2147483647) ARRAY data) ARRAY nested_array,
VARCHAR(2147483647) f0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
nested_not_null, source: [CollectionTableSource(business_data, nested,
nested_array)]]], fields=[business_data, nested, nested_array])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
index f0dabc8eeac..d1ee04db109 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/UnnestTestBase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api._
import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil}
import org.apache.flink.table.types.AbstractDataType
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{BeforeEach, Test}
import java.sql.Timestamp
@@ -32,6 +32,20 @@ abstract class UnnestTestBase(withExecPlan: Boolean) extends
TableTestBase {
protected def getTableTestUtil: TableTestUtil
+ @BeforeEach
+ def setupBeforeEach(): Unit = {
+ util.addTable("""
+ |CREATE TABLE nested_not_null (
+ | business_data ARRAY<STRING NOT NULL>,
+ | nested ROW<`data` ARRAY<STRING NOT NULL>>,
+ | nested_array ARRAY<ROW<`data` ARRAY<STRING NOT NULL>>
NOT NULL>
+ |) WITH (
+ | 'connector' = 'COLLECTION',
+ | 'is-bounded' = 'true'
+ |)
+ """.stripMargin)
+ }
+
@Test
def testUnnestPrimitiveArrayFromTable(): Unit = {
util.addTableSource[(Int, Array[Int], Array[Array[Int]])]("MyTable", 'a,
'b, 'c)
@@ -232,6 +246,42 @@ abstract class UnnestTestBase(withExecPlan: Boolean)
extends TableTestBase {
|""".stripMargin)
}
+ @Test
+ def testNullMismatchLeftJoin(): Unit = {
+ util.verifyRelPlan(
+ "SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name) ON TRUE")
+ }
+
+ @Test
+ def testNullMismatchCrossJoin(): Unit = {
+ util.verifyRelPlan(
+ "SELECT bd_name FROM nested_not_null CROSS JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)")
+ }
+
+ @Test
+ def testNullMismatchNaturalJoin(): Unit = {
+ util.verifyRelPlan(
+ "SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.business_data) AS exploded_bd(bd_name)")
+ }
+
+ @Test
+ def testNullMismatchNaturalJoinOnNested(): Unit = {
+ util.verifyRelPlan(
+ "SELECT bd_name FROM nested_not_null NATURAL JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name)")
+ }
+
+ @Test
+ def testNullMismatchLeftJoinOnNested(): Unit = {
+ util.verifyRelPlan(
+ "SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested.data) AS exploded_bd(bd_name) ON TRUE")
+ }
+
+ @Test
+ def testNullMismatchLeftJoinOnNestedArray(): Unit = {
+ util.verifyRelPlan(
+ "SELECT bd_name FROM nested_not_null LEFT JOIN
UNNEST(nested_not_null.nested_array[0].data) AS exploded_bd(bd_name) ON TRUE")
+ }
+
def verifyPlan(sql: String): Unit = {
if (withExecPlan) {
util.verifyExecPlan(sql)