This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c3a376f5380 [FLINK-30841][table-planner] Fix incorrect calc merge to
avoid wrong plans
c3a376f5380 is described below
commit c3a376f5380ee85c9d34a8e72806e6ce6893be6a
Author: lincoln lee <[email protected]>
AuthorDate: Fri Feb 3 21:15:14 2023 +0800
[FLINK-30841][table-planner] Fix incorrect calc merge to avoid wrong plans
This closes #21799
---
.../rules/logical/FlinkFilterCalcMergeRule.java | 67 ++++++++
.../logical/FlinkFilterProjectTransposeRule.java | 62 ++++++++
.../planner/plan/rules/FlinkBatchRuleSets.scala | 4 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +-
.../planner/plan/batch/sql/CalcMergeTest.java | 36 +++++
.../planner/plan/common/CalcMergeTestBase.java | 110 +++++++++++++
.../planner/plan/stream/sql/CalcMergeTest.java | 36 +++++
.../table/planner/plan/batch/sql/CalcMergeTest.xml | 173 +++++++++++++++++++++
.../table/planner/plan/batch/sql/CalcTest.xml | 39 +++++
.../planner/plan/stream/sql/CalcMergeTest.xml | 173 +++++++++++++++++++++
.../table/planner/plan/stream/sql/CalcTest.xml | 39 +++++
.../table/planner/plan/batch/sql/CalcTest.scala | 22 ++-
.../table/planner/plan/stream/sql/CalcTest.scala | 22 ++-
13 files changed, 779 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
new file mode 100644
index 00000000000..f57eaa600e1
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Extends calcite's FilterCalcMergeRule for streaming scenario, modification:
does not merge the
+ * filter references field which generated by non-deterministic function.
+ */
+public class FlinkFilterCalcMergeRule extends FilterCalcMergeRule {
+
+ public static final RelOptRule INSTANCE = new
FlinkFilterCalcMergeRule(Config.DEFAULT);
+
+ protected FlinkFilterCalcMergeRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalFilter filter = call.rel(0);
+ LogicalCalc calc = call.rel(1);
+
+ List<RexNode> projectExprs = calc.getProgram().getExprList();
+ List<RexLocalRef> projects = calc.getProgram().getProjectList();
+ InputRefVisitor inputRefVisitor = new InputRefVisitor();
+ filter.getCondition().accept(inputRefVisitor);
+ boolean existNonDeterministicRef =
+ Arrays.stream(inputRefVisitor.getFields())
+ .anyMatch(
+ i ->
+ !RexUtil.isDeterministic(
+
projectExprs.get(projects.get(i).getIndex())));
+
+ if (!existNonDeterministicRef) {
+ super.onMatch(call);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
new file mode 100644
index 00000000000..fdca581b612
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Extends calcite's FilterProjectTransposeRule for streaming scenario,
modification: does not
+ * transpose the top filter references field which generated by
non-deterministic function.
+ */
+public class FlinkFilterProjectTransposeRule extends
FilterProjectTransposeRule {
+
+ public static final RelOptRule INSTANCE = new
FlinkFilterProjectTransposeRule(Config.DEFAULT);
+
+ protected FlinkFilterProjectTransposeRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ Project project = call.rel(1);
+
+ List<RexNode> projects = project.getProjects();
+ InputRefVisitor inputRefVisitor = new InputRefVisitor();
+ filter.getCondition().accept(inputRefVisitor);
+ boolean existNonDeterministicRef =
+ Arrays.stream(inputRefVisitor.getFields())
+ .anyMatch(i ->
!RexUtil.isDeterministic(projects.get(i)));
+
+ if (!existNonDeterministicRef) {
+ super.onMatch(call);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 993ce13c615..583b1f01727 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -133,7 +133,7 @@ object FlinkBatchRuleSets {
// push filter through an aggregation
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
// push a filter past a project
- CoreRules.FILTER_PROJECT_TRANSPOSE,
+ FlinkFilterProjectTransposeRule.INSTANCE,
CoreRules.FILTER_SET_OP_TRANSPOSE,
CoreRules.FILTER_MERGE
)
@@ -287,7 +287,7 @@ object FlinkBatchRuleSets {
ConstantRankNumberColumnRemoveRule.INSTANCE,
// calc rules
- CoreRules.FILTER_CALC_MERGE,
+ FlinkFilterCalcMergeRule.INSTANCE,
CoreRules.PROJECT_CALC_MERGE,
CoreRules.FILTER_TO_CALC,
CoreRules.PROJECT_TO_CALC,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d82bbf87658..1df2b7bd131 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -142,7 +142,7 @@ object FlinkStreamRuleSets {
// push filter through an aggregation
CoreRules.FILTER_AGGREGATE_TRANSPOSE,
// push a filter past a project
- CoreRules.FILTER_PROJECT_TRANSPOSE,
+ FlinkFilterProjectTransposeRule.INSTANCE,
// push a filter past a setop
CoreRules.FILTER_SET_OP_TRANSPOSE,
CoreRules.FILTER_MERGE
@@ -280,7 +280,7 @@ object FlinkStreamRuleSets {
DecomposeGroupingSetsRule.INSTANCE,
// calc rules
- CoreRules.FILTER_CALC_MERGE,
+ FlinkFilterCalcMergeRule.INSTANCE,
CoreRules.PROJECT_CALC_MERGE,
CoreRules.FILTER_TO_CALC,
CoreRules.PROJECT_TO_CALC,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.java
new file mode 100644
index 00000000000..4a0e196f1cb
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.CalcMergeTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+/** Plan test for calc merge. */
+public class CalcMergeTest extends CalcMergeTestBase {
+ @Override
+ protected boolean isBatchMode() {
+ return true;
+ }
+
+ @Override
+ protected TableTestUtil getTableTestUtil() {
+ return batchTestUtil(TableConfig.getDefault());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
new file mode 100644
index 00000000000..39c2c1b7626
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common;
+
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Base plan test for calc merge, the difference between
FlinkCalcMergeRuleTest is this test
+ * includes all rules.
+ */
+public abstract class CalcMergeTestBase extends TableTestBase {
+
+ private TableTestUtil util;
+
+ protected abstract boolean isBatchMode();
+
+ protected abstract TableTestUtil getTableTestUtil();
+
+ @Before
+ public void setup() {
+ util = getTableTestUtil();
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE MyTable (\n"
+ + " a INTEGER,\n"
+ + " b INTEGER,\n"
+ + " c VARCHAR\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values'\n"
+ + " ,'bounded' = '"
+ + isBatchMode()
+ + "'\n"
+ + ")");
+ util.addFunction("random_udf", new
JavaUserDefinedScalarFunctions.NonDeterministicUdf());
+ }
+
+ @Test
+ public void testCalcMergeWithSameDigest() {
+ util.verifyExecPlan("SELECT a, b FROM (SELECT * FROM MyTable WHERE a =
b) t WHERE b = a");
+ }
+
+ @Test
+ public void testCalcMergeWithNonDeterministicExpr1() {
+ util.verifyExecPlan(
+ "SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM
MyTable) t WHERE a1 > 10");
+ }
+
+ @Test
+ public void testCalcMergeWithNonDeterministicExpr2() {
+ util.verifyExecPlan(
+ "SELECT random_udf(a1) as a2 FROM (SELECT random_udf(a) as"
+ + " a1, b FROM MyTable) t WHERE b > 10");
+ }
+
+ @Test
+ public void testCalcMergeWithTopMultiNonDeterministicExpr() {
+ util.verifyExecPlan(
+ "SELECT random_udf(a1) as a2, random_udf(a1) as a3 FROM"
+ + " (SELECT random_udf(a) as a1, b FROM MyTable) t
WHERE b > 10");
+ }
+
+ @Test
+ public void testCalcMergeTopFilterHasNonDeterministicExpr() {
+ util.verifyExecPlan(
+ "SELECT a, c FROM"
+ + " (SELECT a, random_udf(b) as b1, c FROM MyTable) t
WHERE b1 > 10");
+ }
+
+ @Test
+ public void testCalcMergeWithBottomMultiNonDeterministicExpr() {
+ util.verifyExecPlan(
+ "SELECT a1, b2 FROM"
+ + " (SELECT random_udf(a) as a1, random_udf(b) as b2,
c FROM MyTable) t WHERE c > 10");
+ }
+
+ @Test
+ public void testCalcMergeWithBottomMultiNonDeterministicInConditionExpr() {
+ util.verifyExecPlan(
+ "SELECT c FROM"
+ + " (SELECT random_udf(a) as a1, random_udf(b) as b2,
c FROM MyTable) t WHERE a1 > b2");
+ }
+
+ @Test
+ public void testCalcMergeWithoutInnerNonDeterministicExpr() {
+ util.verifyExecPlan(
+ "SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM
MyTable) t WHERE c > 10");
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.java
new file mode 100644
index 00000000000..9da719f03b1
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.CalcMergeTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+/** Plan test for calc merge. */
+public class CalcMergeTest extends CalcMergeTestBase {
+ @Override
+ protected boolean isBatchMode() {
+ return false;
+ }
+
+ @Override
+ protected TableTestUtil getTableTestUtil() {
+ return streamTestUtil(TableConfig.getDefault());
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
new file mode 100644
index 00000000000..b98694fb803
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCalcMergeWithTopMultiNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT random_udf(a1) as a2, random_udf(a1) as a3 FROM (SELECT
random_udf(a) as a1, b FROM MyTable) t WHERE b > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a2=[random_udf($0)], a3=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a1=[random_udf($0)], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2, random_udf(random_udf(a)) AS
a3], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeTopFilterHasNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM (SELECT a, random_udf(b) as b1, c FROM
MyTable) t WHERE b1 > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], b1=[random_udf($1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, c], where=[(random_udf(b) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithBottomMultiNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a1, b2 FROM (SELECT random_udf(a) as a1, random_udf(b)
as b2, c FROM MyTable) t WHERE c > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b2=[$1])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+ +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[random_udf(a) AS a1, random_udf(b) AS b2], where=[(CAST(c AS
BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithBottomMultiNonDeterministicInConditionExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM (SELECT random_udf(a) as a1, random_udf(b) as b2,
c FROM MyTable) t WHERE a1 > b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, $1)])
+ +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[c], where=[(random_udf(a) > random_udf(b))])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+ <Resource name="sql">
+ <![CDATA[SELECT random_udf(a1) as a2 FROM (SELECT random_udf(a) as a1, b
FROM MyTable) t WHERE b > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a2=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a1=[random_udf($0)], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM
MyTable) t WHERE c > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithSameDigest">
+ <Resource name="sql">
+ <![CDATA[SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b
= a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[=($1, $0)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[=($0, $1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b], where=[(a = b)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+ <Resource name="sql">
+ <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable)
t WHERE a1 > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(a) AS a1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[a], metadata=[]]], fields=[a])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
index d16146703c9..5dab40a7300 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
@@ -30,6 +30,45 @@
LogicalProject(EXPR$0=[ARRAY(_UTF-16LE'Hi':VARCHAR(2147483647) CHARACTER SET "UT
<![CDATA[
Calc(select=[ARRAY('Hi', 'Hello', c) AS EXPR$0])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM MyTable)
t WHERE a1 > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(b) AS a1])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b)
> 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[>(random_udf($1), 10)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a], where=[>(random_udf(b), 10)])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
new file mode 100644
index 00000000000..b98694fb803
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testCalcMergeWithTopMultiNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT random_udf(a1) as a2, random_udf(a1) as a3 FROM (SELECT
random_udf(a) as a1, b FROM MyTable) t WHERE b > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a2=[random_udf($0)], a3=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a1=[random_udf($0)], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2, random_udf(random_udf(a)) AS
a3], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeTopFilterHasNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM (SELECT a, random_udf(b) as b1, c FROM
MyTable) t WHERE b1 > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], b1=[random_udf($1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, c], where=[(random_udf(b) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithBottomMultiNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a1, b2 FROM (SELECT random_udf(a) as a1, random_udf(b)
as b2, c FROM MyTable) t WHERE c > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b2=[$1])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+ +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[random_udf(a) AS a1, random_udf(b) AS b2], where=[(CAST(c AS
BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithBottomMultiNonDeterministicInConditionExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT c FROM (SELECT random_udf(a) as a1, random_udf(b) as b2,
c FROM MyTable) t WHERE a1 > b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, $1)])
+ +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[c], where=[(random_udf(a) > random_udf(b))])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+ <Resource name="sql">
+ <![CDATA[SELECT random_udf(a1) as a2 FROM (SELECT random_udf(a) as a1, b
FROM MyTable) t WHERE b > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a2=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a1=[random_udf($0)], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM
MyTable) t WHERE c > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithSameDigest">
+ <Resource name="sql">
+ <![CDATA[SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b
= a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[=($1, $0)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[=($0, $1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b], where=[(a = b)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable,
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+ <Resource name="sql">
+ <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable)
t WHERE a1 > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(a) AS a1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable,
project=[a], metadata=[]]], fields=[a])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index b40056000f4..51450c5fde4 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -30,6 +30,45 @@
LogicalProject(EXPR$0=[ARRAY(_UTF-16LE'Hi':VARCHAR(2147483647) CHARACTER SET "UT
<![CDATA[
Calc(select=[ARRAY('Hi', 'Hello', c) AS EXPR$0])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr">
+ <Resource name="sql">
+ <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM MyTable)
t WHERE a1 > 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+ +- LogicalProject(a=[$0], a1=[random_udf($1)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(b) AS a1])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+ <Resource name="sql">
+ <![CDATA[SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b)
> 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[>(random_udf($1), 10)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a], where=[>(random_udf(b), 10)])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
index d0fdce135f1..09a13014aff 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
@@ -22,16 +22,22 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.utils.MyPojo
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
import org.apache.flink.table.planner.utils.TableTestBase
-import org.junit.Test
+import org.junit.{Before, Test}
import java.sql.{Date, Time, Timestamp}
class CalcTest extends TableTestBase {
private val util = batchTestUtil()
- util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+
+ @Before
+ def setup(): Unit = {
+ util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("random_udf", new NonDeterministicUdf)
+ }
@Test
def testOnlyProject(): Unit = {
@@ -187,4 +193,16 @@ class CalcTest extends TableTestBase {
def testDecimalMapWithDifferentPrecision(): Unit = {
util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]")
}
+
+ @Test
+ def testCalcMergeWithNonDeterministicExpr(): Unit = {
+ val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM
MyTable) t WHERE a1 > 10"
+ util.verifyExecPlan(sqlQuery)
+ }
+
+ @Test
+ def testCalcMergeWithNonDeterministicExpr2(): Unit = {
+ val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE
random_udf(b) > 10"
+ util.verifyRelPlan(sqlQuery)
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
index 617a7bdc3f9..3e74c00480f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
@@ -22,15 +22,21 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.plan.utils.MyPojo
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
import org.apache.flink.table.planner.utils.TableTestBase
-import org.junit.Test
+import org.junit.{Before, Test}
import java.sql.{Date, Time, Timestamp}
class CalcTest extends TableTestBase {
private val util = streamTestUtil()
- util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+
+ @Before
+ def setup(): Unit = {
+ util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("random_udf", new NonDeterministicUdf)
+ }
@Test
def testOnlyProject(): Unit = {
@@ -181,4 +187,16 @@ class CalcTest extends TableTestBase {
def testDecimalMapWithDifferentPrecision(): Unit = {
util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]")
}
+
+ @Test
+ def testCalcMergeWithNonDeterministicExpr(): Unit = {
+ val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM
MyTable) t WHERE a1 > 10"
+ util.verifyExecPlan(sqlQuery)
+ }
+
+ @Test
+ def testCalcMergeWithNonDeterministicExpr2(): Unit = {
+ val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE
random_udf(b) > 10"
+ util.verifyRelPlan(sqlQuery)
+ }
}