This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 01743000c10720598dfb6f27d849da2283772e50
Author: lincoln.lil <[email protected]>
AuthorDate: Fri Feb 3 22:33:32 2023 +0800

    [FLINK-30006][table-planner] Fix FlinkFilterCalcMergeRule that didn't 
expand local ref to handle nested expressions contain non-deterministic call
---
 .../rules/logical/FlinkFilterCalcMergeRule.java    | 13 +++---
 .../planner/plan/common/CalcMergeTestBase.java     |  6 +++
 .../table/planner/plan/batch/sql/CalcMergeTest.xml | 50 +++++++++++++++-------
 .../planner/plan/stream/sql/CalcMergeTest.xml      | 50 +++++++++++++++-------
 4 files changed, 82 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
index f57eaa600e1..9e17de2f0ea 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
@@ -25,12 +25,12 @@ import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.logical.LogicalCalc;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.rules.FilterCalcMergeRule;
-import org.apache.calcite.rex.RexLocalRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Extends calcite's FilterCalcMergeRule for streaming scenario, modification: 
does not merge the
@@ -49,16 +49,15 @@ public class FlinkFilterCalcMergeRule extends 
FilterCalcMergeRule {
         LogicalFilter filter = call.rel(0);
         LogicalCalc calc = call.rel(1);
 
-        List<RexNode> projectExprs = calc.getProgram().getExprList();
-        List<RexLocalRef> projects = calc.getProgram().getProjectList();
+        List<RexNode> expandProjects =
+                calc.getProgram().getProjectList().stream()
+                        .map(p -> calc.getProgram().expandLocalRef(p))
+                        .collect(Collectors.toList());
         InputRefVisitor inputRefVisitor = new InputRefVisitor();
         filter.getCondition().accept(inputRefVisitor);
         boolean existNonDeterministicRef =
                 Arrays.stream(inputRefVisitor.getFields())
-                        .anyMatch(
-                                i ->
-                                        !RexUtil.isDeterministic(
-                                                
projectExprs.get(projects.get(i).getIndex())));
+                        .anyMatch(i -> 
!RexUtil.isDeterministic(expandProjects.get(i)));
 
         if (!existNonDeterministicRef) {
             super.onMatch(call);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
index 39c2c1b7626..1f8cf1caad2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
@@ -107,4 +107,10 @@ public abstract class CalcMergeTestBase extends 
TableTestBase {
         util.verifyExecPlan(
                 "SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10");
     }
+
+    @Test
+    public void testCalcMergeWithNonDeterministicNestedExpr() {
+        util.verifyExecPlan(
+                "SELECT a, a1 FROM (SELECT a, substr(cast(random_udf(a) as 
varchar), 1, 2) AS a1 FROM MyTable) t WHERE a1 > '10'");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
index b98694fb803..6adb95720ad 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
@@ -89,6 +89,26 @@ LogicalProject(c=[$2])
       <![CDATA[
 Calc(select=[c], where=[(random_udf(a) > random_udf(b))])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(a) AS a1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
 ]]>
     </Resource>
   </TestCase>
@@ -111,22 +131,23 @@ Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b 
> 10)])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
+  <TestCase name="testCalcMergeWithNonDeterministicNestedExpr">
     <Resource name="sql">
-      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10]]>
+      <![CDATA[SELECT a, a1 FROM (SELECT a, substr(cast(random_udf(a) as 
varchar), 1, 2) AS a1 FROM MyTable) t WHERE a1 > '10']]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
-   +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, _UTF-16LE'10')])
+   +- LogicalProject(a=[$0], 
a1=[SUBSTR(CAST(random_udf($0)):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
NOT NULL, 1, 2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
+Calc(select=[a, a1], where=[(a1 > '10')])
++- Calc(select=[a, SUBSTR(CAST(random_udf(a) AS VARCHAR(2147483647)), 1, 2) AS 
a1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
 ]]>
     </Resource>
   </TestCase>
@@ -150,23 +171,22 @@ Calc(select=[a, b], where=[(a = b)])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+  <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
     <Resource name="sql">
-      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], a1=[$1])
-+- LogicalFilter(condition=[>($1, 10)])
-   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, a1], where=[(a1 > 10)])
-+- Calc(select=[a, random_udf(a) AS a1])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
+Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
index b98694fb803..6adb95720ad 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
@@ -89,6 +89,26 @@ LogicalProject(c=[$2])
       <![CDATA[
 Calc(select=[c], where=[(random_udf(a) > random_udf(b))])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(a) AS a1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
 ]]>
     </Resource>
   </TestCase>
@@ -111,22 +131,23 @@ Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b 
> 10)])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
+  <TestCase name="testCalcMergeWithNonDeterministicNestedExpr">
     <Resource name="sql">
-      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10]]>
+      <![CDATA[SELECT a, a1 FROM (SELECT a, substr(cast(random_udf(a) as 
varchar), 1, 2) AS a1 FROM MyTable) t WHERE a1 > '10']]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], c=[$2])
-+- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
-   +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, _UTF-16LE'10')])
+   +- LogicalProject(a=[$0], 
a1=[SUBSTR(CAST(random_udf($0)):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
NOT NULL, 1, 2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
+Calc(select=[a, a1], where=[(a1 > '10')])
++- Calc(select=[a, SUBSTR(CAST(random_udf(a) AS VARCHAR(2147483647)), 1, 2) AS 
a1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
 ]]>
     </Resource>
   </TestCase>
@@ -150,23 +171,22 @@ Calc(select=[a, b], where=[(a = b)])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+  <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
     <Resource name="sql">
-      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], a1=[$1])
-+- LogicalFilter(condition=[>($1, 10)])
-   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, a1], where=[(a1 > 10)])
-+- Calc(select=[a, random_udf(a) AS a1])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
+Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
 ]]>
     </Resource>
   </TestCase>

Reply via email to