This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new acc348613e5 [FLINK-35804][table-planner] Fix incorrect calc merge
during decorrelate phase
acc348613e5 is described below
commit acc348613e5c0f955e34e5fd456d3cbd5a29b5de
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jul 15 15:17:58 2024 +0800
[FLINK-35804][table-planner] Fix incorrect calc merge during decorrelate
phase
This closes #25084
Co-authored-by: zhaorongsheng <[email protected]>
---
.../apache/calcite/sql2rel/RelDecorrelator.java | 38 ++++++++++++----------
.../logical/FlinkFilterProjectTransposeRule.java | 4 +++
.../table/planner/plan/batch/sql/CalcTest.xml | 31 ++++++++++++++++++
.../table/planner/plan/stream/sql/CalcTest.xml | 31 ++++++++++++++++++
.../table/planner/plan/batch/sql/CalcTest.scala | 16 +++++++++
.../table/planner/plan/stream/sql/CalcTest.scala | 16 +++++++++
6 files changed, 119 insertions(+), 17 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 10c2764a807..8aed985a7b0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.sql2rel;
+import
org.apache.flink.table.planner.plan.rules.logical.FlinkFilterProjectTransposeRule;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -125,9 +127,9 @@ import static
org.apache.calcite.linq4j.Nullness.castNonNull;
* Copied to fix calcite issues. FLINK modifications are at lines
*
* <ol>
- * <li>Was changed within FLINK-29280, FLINK-28682: Line 216 ~ 223
- * <li>Should be removed after fix of FLINK-29540: Line 289 ~ 295
- * <li>Should be removed after fix of FLINK-29540: Line 307 ~ 313
+ * <li>Was changed within FLINK-29280, FLINK-28682, FLINK-35804: Line 218 ~
225, Line 273 ~ 288
+ * <li>Should be removed after fix of FLINK-29540: Line 293 ~ 299
+ * <li>Should be removed after fix of FLINK-29540: Line 311 ~ 317
* </ol>
*/
public class RelDecorrelator implements ReflectiveVisitor {
@@ -268,20 +270,22 @@ public class RelDecorrelator implements ReflectiveVisitor
{
.FilterIntoJoinRuleConfig.class)
.toRule())
.addRuleInstance(
- CoreRules.FILTER_PROJECT_TRANSPOSE
- .config
- .withRelBuilderFactory(f)
-
.as(FilterProjectTransposeRule.Config.class)
- .withOperandFor(
- Filter.class,
- filter ->
-
!RexUtil.containsCorrelation(
-
filter.getCondition()),
- Project.class,
- project -> true)
- .withCopyFilter(true)
- .withCopyProject(true)
- .toRule())
+ // ----- FLINK MODIFICATION BEGIN -----
+ FlinkFilterProjectTransposeRule.build(
+ CoreRules.FILTER_PROJECT_TRANSPOSE
+ .config
+ .withRelBuilderFactory(f)
+
.as(FilterProjectTransposeRule.Config.class)
+ .withOperandFor(
+ Filter.class,
+ filter ->
+
!RexUtil.containsCorrelation(
+
filter.getCondition()),
+ Project.class,
+ project -> true)
+ .withCopyFilter(true)
+ .withCopyProject(true)))
+ // ----- FLINK MODIFICATION END -----
.addRuleInstance(
FilterCorrelateRule.Config.DEFAULT
.withRelBuilderFactory(f)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
index fdca581b612..54d6f277c8c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
@@ -39,6 +39,10 @@ public class FlinkFilterProjectTransposeRule extends
FilterProjectTransposeRule
public static final RelOptRule INSTANCE = new
FlinkFilterProjectTransposeRule(Config.DEFAULT);
+ public static FlinkFilterProjectTransposeRule build(Config config) {
+ return new FlinkFilterProjectTransposeRule(config);
+ }
+
protected FlinkFilterProjectTransposeRule(Config config) {
super(config);
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
index 0b04d0a7924..69692846ffd 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
@@ -69,6 +69,37 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a], where=[>(random_udf(b), 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithCorrelate">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, r FROM (
+ SELECT a, random_udf(b) r FROM (
+ select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+ ) t
+)
+WHERE r > 10
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], r=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], r=[random_udf($1)])
+ +- LogicalProject(a=[$0], b=[$1], c1=[$3])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, r], where=[>(r, 10)])
++- Calc(select=[a, random_udf(b) AS r])
+ +- Correlate(invocation=[str_split($cor0.c)],
correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 4397895cebc..215a6aa8eea 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -69,6 +69,37 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a], where=[>(random_udf(b), 10)])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithCorrelate">
+ <Resource name="sql">
+ <![CDATA[
+SELECT a, r FROM (
+ SELECT a, random_udf(b) r FROM (
+ select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+ ) t
+)
+WHERE r > 10
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], r=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], r=[random_udf($1)])
+ +- LogicalProject(a=[$0], b=[$1], c1=[$3])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{2}])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableFunctionScan(invocation=[str_split($cor0.c)],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, r], where=[>(r, 10)])
++- Calc(select=[a, random_udf(b) AS r])
+ +- Correlate(invocation=[str_split($cor0.c)],
correlate=[table(str_split($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
index abd196d34af..49bd11af112 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.utils.MyPojo
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1,
StringSplit}
import org.apache.flink.table.planner.utils.TableTestBase
import org.assertj.core.api.Assertions.assertThatExceptionOfType
@@ -207,4 +208,19 @@ class CalcTest extends TableTestBase {
val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE
random_udf(b) > 10"
util.verifyRelPlan(sqlQuery)
}
+
+ @Test
+ def testCalcMergeWithCorrelate(): Unit = {
+ util.addTemporarySystemFunction("str_split", new StringSplit())
+ val sqlQuery =
+ """
+ |SELECT a, r FROM (
+ | SELECT a, random_udf(b) r FROM (
+ | select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+ | ) t
+ |)
+ |WHERE r > 10
+ |""".stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
index a0643c21e62..1c62fc054e2 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.utils.MyPojo
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFunc1,
StringSplit}
import org.apache.flink.table.planner.utils.TableTestBase
import org.assertj.core.api.Assertions.assertThatExceptionOfType
@@ -201,4 +202,19 @@ class CalcTest extends TableTestBase {
val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE
random_udf(b) > 10"
util.verifyRelPlan(sqlQuery)
}
+
+ @Test
+ def testCalcMergeWithCorrelate(): Unit = {
+ util.addTemporarySystemFunction("str_split", new StringSplit())
+ val sqlQuery =
+ """
+ |SELECT a, r FROM (
+ | SELECT a, random_udf(b) r FROM (
+ | select a, b, c1 FROM MyTable, LATERAL TABLE(str_split(c)) AS T(c1)
+ | ) t
+ |)
+ |WHERE r > 10
+ |""".stripMargin
+ util.verifyRelPlan(sqlQuery)
+ }
}