This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 01743000c10720598dfb6f27d849da2283772e50 Author: lincoln.lil <[email protected]> AuthorDate: Fri Feb 3 22:33:32 2023 +0800 [FLINK-30006][table-planner] Fix FlinkFilterCalcMergeRule that didn't expand local ref to handle nested expressions contain non-deterministic call --- .../rules/logical/FlinkFilterCalcMergeRule.java | 13 +++--- .../planner/plan/common/CalcMergeTestBase.java | 6 +++ .../table/planner/plan/batch/sql/CalcMergeTest.xml | 50 +++++++++++++++------- .../planner/plan/stream/sql/CalcMergeTest.xml | 50 +++++++++++++++------- 4 files changed, 82 insertions(+), 37 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java index f57eaa600e1..9e17de2f0ea 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java @@ -25,12 +25,12 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.logical.LogicalCalc; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.rules.FilterCalcMergeRule; -import org.apache.calcite.rex.RexLocalRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Extends calcite's FilterCalcMergeRule for streaming scenario, modification: does not merge the @@ -49,16 +49,15 @@ public class FlinkFilterCalcMergeRule extends FilterCalcMergeRule { LogicalFilter filter = call.rel(0); LogicalCalc calc = call.rel(1); - List<RexNode> projectExprs = calc.getProgram().getExprList(); - List<RexLocalRef> projects = calc.getProgram().getProjectList(); + List<RexNode> expandProjects = + calc.getProgram().getProjectList().stream() + .map(p -> calc.getProgram().expandLocalRef(p)) + .collect(Collectors.toList()); InputRefVisitor inputRefVisitor = new InputRefVisitor(); filter.getCondition().accept(inputRefVisitor); boolean existNonDeterministicRef = Arrays.stream(inputRefVisitor.getFields()) - .anyMatch( - i -> - !RexUtil.isDeterministic( - projectExprs.get(projects.get(i).getIndex()))); + .anyMatch(i -> !RexUtil.isDeterministic(expandProjects.get(i))); if (!existNonDeterministicRef) { super.onMatch(call); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java index 39c2c1b7626..1f8cf1caad2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java @@ -107,4 +107,10 @@ public abstract class CalcMergeTestBase extends TableTestBase { util.verifyExecPlan( "SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM MyTable) t WHERE c > 10"); } + + @Test + public void testCalcMergeWithNonDeterministicNestedExpr() { + util.verifyExecPlan( + "SELECT a, a1 FROM (SELECT a, substr(cast(random_udf(a) as varchar), 1, 2) AS a1 FROM MyTable) t WHERE a1 > '10'"); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml index b98694fb803..6adb95720ad 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml @@ -89,6 +89,26 @@ LogicalProject(c=[$2]) <![CDATA[ Calc(select=[c], where=[(random_udf(a) > random_udf(b))]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testCalcMergeWithNonDeterministicExpr1"> + <Resource name="sql"> + <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], a1=[$1]) ++- LogicalFilter(condition=[>($1, 10)]) + +- LogicalProject(a=[$0], a1=[random_udf($0)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, a1], where=[(a1 > 10)]) ++- Calc(select=[a, random_udf(a) AS a1]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a], metadata=[]]], fields=[a]) ]]> </Resource> </TestCase> @@ -111,22 +131,23 @@ Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b > 10)]) ]]> </Resource> </TestCase> - <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr"> + <TestCase name="testCalcMergeWithNonDeterministicNestedExpr"> <Resource name="sql"> - <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM MyTable) t WHERE c > 10]]> + <![CDATA[SELECT a, a1 FROM (SELECT a, substr(cast(random_udf(a) as varchar), 1, 2) AS a1 FROM MyTable) t WHERE a1 > '10']]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(a=[$0], c=[$2]) -+- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)]) - +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2]) +LogicalProject(a=[$0], a1=[$1]) ++- LogicalFilter(condition=[>($1, _UTF-16LE'10')]) + +- LogicalProject(a=[$0], a1=[SUBSTR(CAST(random_udf($0)):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL, 1, 2)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)]) -+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, c], metadata=[]]], fields=[a, c]) +Calc(select=[a, a1], where=[(a1 > '10')]) ++- Calc(select=[a, SUBSTR(CAST(random_udf(a) AS VARCHAR(2147483647)), 1, 2) AS a1]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a], metadata=[]]], fields=[a]) ]]> </Resource> </TestCase> @@ -150,23 +171,22 @@ Calc(select=[a, b], where=[(a = b)]) ]]> </Resource> </TestCase> - <TestCase name="testCalcMergeWithNonDeterministicExpr1"> + <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr"> <Resource name="sql"> - <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10]]> + <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM MyTable) t WHERE c > 10]]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(a=[$0], a1=[$1]) -+- LogicalFilter(condition=[>($1, 10)]) - +- LogicalProject(a=[$0], a1=[random_udf($0)]) +LogicalProject(a=[$0], c=[$2]) ++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)]) + +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, a1], where=[(a1 > 10)]) -+- Calc(select=[a, random_udf(a) AS a1]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a], metadata=[]]], fields=[a]) +Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, c], metadata=[]]], fields=[a, c]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml index b98694fb803..6adb95720ad 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml @@ -89,6 +89,26 @@ LogicalProject(c=[$2]) <![CDATA[ Calc(select=[c], where=[(random_udf(a) > random_udf(b))]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testCalcMergeWithNonDeterministicExpr1"> + <Resource name="sql"> + <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], a1=[$1]) ++- LogicalFilter(condition=[>($1, 10)]) + +- LogicalProject(a=[$0], a1=[random_udf($0)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, a1], where=[(a1 > 10)]) ++- Calc(select=[a, random_udf(a) AS a1]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a], metadata=[]]], fields=[a]) ]]> </Resource> </TestCase> @@ -111,22 +131,23 @@ Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b > 10)]) ]]> </Resource> </TestCase> - <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr"> + <TestCase name="testCalcMergeWithNonDeterministicNestedExpr"> <Resource name="sql"> - <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM MyTable) t WHERE c > 10]]> + <![CDATA[SELECT a, a1 FROM (SELECT a, substr(cast(random_udf(a) as varchar), 1, 2) AS a1 FROM MyTable) t WHERE a1 > '10']]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(a=[$0], c=[$2]) -+- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)]) - +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2]) +LogicalProject(a=[$0], a1=[$1]) ++- LogicalFilter(condition=[>($1, _UTF-16LE'10')]) + +- LogicalProject(a=[$0], a1=[SUBSTR(CAST(random_udf($0)):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL, 1, 2)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)]) -+- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, c], metadata=[]]], fields=[a, c]) +Calc(select=[a, a1], where=[(a1 > '10')]) ++- Calc(select=[a, SUBSTR(CAST(random_udf(a) AS VARCHAR(2147483647)), 1, 2) AS a1]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a], metadata=[]]], fields=[a]) ]]> </Resource> </TestCase> @@ -150,23 +171,22 @@ Calc(select=[a, b], where=[(a = b)]) ]]> </Resource> </TestCase> - <TestCase name="testCalcMergeWithNonDeterministicExpr1"> + <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr"> <Resource name="sql"> - <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) t WHERE a1 > 10]]> + <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM MyTable) t WHERE c > 10]]> </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(a=[$0], a1=[$1]) -+- LogicalFilter(condition=[>($1, 10)]) - +- LogicalProject(a=[$0], a1=[random_udf($0)]) +LogicalProject(a=[$0], c=[$2]) ++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)]) + +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[a, a1], where=[(a1 > 10)]) -+- Calc(select=[a, random_udf(a) AS a1]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a], metadata=[]]], fields=[a]) +Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable, filter=[], project=[a, c], metadata=[]]], fields=[a, c]) ]]> </Resource> </TestCase>
