This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c3a376f5380 [FLINK-30841][table-planner] Fix incorrect calc merge to 
avoid wrong plans
c3a376f5380 is described below

commit c3a376f5380ee85c9d34a8e72806e6ce6893be6a
Author: lincoln lee <[email protected]>
AuthorDate: Fri Feb 3 21:15:14 2023 +0800

    [FLINK-30841][table-planner] Fix incorrect calc merge to avoid wrong plans
    
    This closes #21799
---
 .../rules/logical/FlinkFilterCalcMergeRule.java    |  67 ++++++++
 .../logical/FlinkFilterProjectTransposeRule.java   |  62 ++++++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   4 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   4 +-
 .../planner/plan/batch/sql/CalcMergeTest.java      |  36 +++++
 .../planner/plan/common/CalcMergeTestBase.java     | 110 +++++++++++++
 .../planner/plan/stream/sql/CalcMergeTest.java     |  36 +++++
 .../table/planner/plan/batch/sql/CalcMergeTest.xml | 173 +++++++++++++++++++++
 .../table/planner/plan/batch/sql/CalcTest.xml      |  39 +++++
 .../planner/plan/stream/sql/CalcMergeTest.xml      | 173 +++++++++++++++++++++
 .../table/planner/plan/stream/sql/CalcTest.xml     |  39 +++++
 .../table/planner/plan/batch/sql/CalcTest.scala    |  22 ++-
 .../table/planner/plan/stream/sql/CalcTest.scala   |  22 ++-
 13 files changed, 779 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
new file mode 100644
index 00000000000..f57eaa600e1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterCalcMergeRule.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Extends calcite's FilterCalcMergeRule for streaming scenario, modification: 
does not merge the
+ * filter references field which generated by non-deterministic function.
+ */
+public class FlinkFilterCalcMergeRule extends FilterCalcMergeRule {
+
+    public static final RelOptRule INSTANCE = new 
FlinkFilterCalcMergeRule(Config.DEFAULT);
+
+    protected FlinkFilterCalcMergeRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        LogicalFilter filter = call.rel(0);
+        LogicalCalc calc = call.rel(1);
+
+        List<RexNode> projectExprs = calc.getProgram().getExprList();
+        List<RexLocalRef> projects = calc.getProgram().getProjectList();
+        InputRefVisitor inputRefVisitor = new InputRefVisitor();
+        filter.getCondition().accept(inputRefVisitor);
+        boolean existNonDeterministicRef =
+                Arrays.stream(inputRefVisitor.getFields())
+                        .anyMatch(
+                                i ->
+                                        !RexUtil.isDeterministic(
+                                                
projectExprs.get(projects.get(i).getIndex())));
+
+        if (!existNonDeterministicRef) {
+            super.onMatch(call);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
new file mode 100644
index 00000000000..fdca581b612
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkFilterProjectTransposeRule.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.utils.InputRefVisitor;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Extends calcite's FilterProjectTransposeRule for streaming scenario, 
modification: does not
+ * transpose the top filter references field which generated by 
non-deterministic function.
+ */
+public class FlinkFilterProjectTransposeRule extends 
FilterProjectTransposeRule {
+
+    public static final RelOptRule INSTANCE = new 
FlinkFilterProjectTransposeRule(Config.DEFAULT);
+
+    protected FlinkFilterProjectTransposeRule(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Filter filter = call.rel(0);
+        Project project = call.rel(1);
+
+        List<RexNode> projects = project.getProjects();
+        InputRefVisitor inputRefVisitor = new InputRefVisitor();
+        filter.getCondition().accept(inputRefVisitor);
+        boolean existNonDeterministicRef =
+                Arrays.stream(inputRefVisitor.getFields())
+                        .anyMatch(i -> 
!RexUtil.isDeterministic(projects.get(i)));
+
+        if (!existNonDeterministicRef) {
+            super.onMatch(call);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 993ce13c615..583b1f01727 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -133,7 +133,7 @@ object FlinkBatchRuleSets {
     // push filter through an aggregation
     CoreRules.FILTER_AGGREGATE_TRANSPOSE,
     // push a filter past a project
-    CoreRules.FILTER_PROJECT_TRANSPOSE,
+    FlinkFilterProjectTransposeRule.INSTANCE,
     CoreRules.FILTER_SET_OP_TRANSPOSE,
     CoreRules.FILTER_MERGE
   )
@@ -287,7 +287,7 @@ object FlinkBatchRuleSets {
     ConstantRankNumberColumnRemoveRule.INSTANCE,
 
     // calc rules
-    CoreRules.FILTER_CALC_MERGE,
+    FlinkFilterCalcMergeRule.INSTANCE,
     CoreRules.PROJECT_CALC_MERGE,
     CoreRules.FILTER_TO_CALC,
     CoreRules.PROJECT_TO_CALC,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d82bbf87658..1df2b7bd131 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -142,7 +142,7 @@ object FlinkStreamRuleSets {
     // push filter through an aggregation
     CoreRules.FILTER_AGGREGATE_TRANSPOSE,
     // push a filter past a project
-    CoreRules.FILTER_PROJECT_TRANSPOSE,
+    FlinkFilterProjectTransposeRule.INSTANCE,
     // push a filter past a setop
     CoreRules.FILTER_SET_OP_TRANSPOSE,
     CoreRules.FILTER_MERGE
@@ -280,7 +280,7 @@ object FlinkStreamRuleSets {
     DecomposeGroupingSetsRule.INSTANCE,
 
     // calc rules
-    CoreRules.FILTER_CALC_MERGE,
+    FlinkFilterCalcMergeRule.INSTANCE,
     CoreRules.PROJECT_CALC_MERGE,
     CoreRules.FILTER_TO_CALC,
     CoreRules.PROJECT_TO_CALC,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.java
new file mode 100644
index 00000000000..4a0e196f1cb
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.CalcMergeTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+/** Plan test for calc merge. */
+public class CalcMergeTest extends CalcMergeTestBase {
+    @Override
+    protected boolean isBatchMode() {
+        return true;
+    }
+
+    @Override
+    protected TableTestUtil getTableTestUtil() {
+        return batchTestUtil(TableConfig.getDefault());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
new file mode 100644
index 00000000000..39c2c1b7626
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/CalcMergeTestBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common;
+
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Base plan test for calc merge, the difference between 
FlinkCalcMergeRuleTest is this test
+ * includes all rules.
+ */
+public abstract class CalcMergeTestBase extends TableTestBase {
+
+    private TableTestUtil util;
+
+    protected abstract boolean isBatchMode();
+
+    protected abstract TableTestUtil getTableTestUtil();
+
+    @Before
+    public void setup() {
+        util = getTableTestUtil();
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE MyTable (\n"
+                                + " a INTEGER,\n"
+                                + " b INTEGER,\n"
+                                + " c VARCHAR\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values'\n"
+                                + " ,'bounded' = '"
+                                + isBatchMode()
+                                + "'\n"
+                                + ")");
+        util.addFunction("random_udf", new 
JavaUserDefinedScalarFunctions.NonDeterministicUdf());
+    }
+
+    @Test
+    public void testCalcMergeWithSameDigest() {
+        util.verifyExecPlan("SELECT a, b FROM (SELECT * FROM MyTable WHERE a = 
b) t WHERE b = a");
+    }
+
+    @Test
+    public void testCalcMergeWithNonDeterministicExpr1() {
+        util.verifyExecPlan(
+                "SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM 
MyTable) t WHERE a1 > 10");
+    }
+
+    @Test
+    public void testCalcMergeWithNonDeterministicExpr2() {
+        util.verifyExecPlan(
+                "SELECT random_udf(a1) as a2 FROM (SELECT random_udf(a) as"
+                        + " a1, b FROM MyTable) t WHERE b > 10");
+    }
+
+    @Test
+    public void testCalcMergeWithTopMultiNonDeterministicExpr() {
+        util.verifyExecPlan(
+                "SELECT random_udf(a1) as a2, random_udf(a1) as a3 FROM"
+                        + " (SELECT random_udf(a) as a1, b FROM MyTable) t 
WHERE b > 10");
+    }
+
+    @Test
+    public void testCalcMergeTopFilterHasNonDeterministicExpr() {
+        util.verifyExecPlan(
+                "SELECT a, c FROM"
+                        + " (SELECT a, random_udf(b) as b1, c FROM MyTable) t 
WHERE b1 > 10");
+    }
+
+    @Test
+    public void testCalcMergeWithBottomMultiNonDeterministicExpr() {
+        util.verifyExecPlan(
+                "SELECT a1, b2 FROM"
+                        + " (SELECT random_udf(a) as a1, random_udf(b) as b2, 
c FROM MyTable) t WHERE c > 10");
+    }
+
+    @Test
+    public void testCalcMergeWithBottomMultiNonDeterministicInConditionExpr() {
+        util.verifyExecPlan(
+                "SELECT c FROM"
+                        + " (SELECT random_udf(a) as a1, random_udf(b) as b2, 
c FROM MyTable) t WHERE a1 > b2");
+    }
+
+    @Test
+    public void testCalcMergeWithoutInnerNonDeterministicExpr() {
+        util.verifyExecPlan(
+                "SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10");
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.java
new file mode 100644
index 00000000000..9da719f03b1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.plan.common.CalcMergeTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+/** Plan test for calc merge. */
+public class CalcMergeTest extends CalcMergeTestBase {
+    @Override
+    protected boolean isBatchMode() {
+        return false;
+    }
+
+    @Override
+    protected TableTestUtil getTableTestUtil() {
+        return streamTestUtil(TableConfig.getDefault());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
new file mode 100644
index 00000000000..b98694fb803
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcMergeTest.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCalcMergeWithTopMultiNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT random_udf(a1) as a2, random_udf(a1) as a3 FROM (SELECT 
random_udf(a) as a1, b FROM MyTable) t WHERE b > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a2=[random_udf($0)], a3=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a1=[random_udf($0)], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2, random_udf(random_udf(a)) AS 
a3], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeTopFilterHasNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(b) as b1, c FROM 
MyTable) t WHERE b1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], b1=[random_udf($1)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, c], where=[(random_udf(b) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithBottomMultiNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a1, b2 FROM (SELECT random_udf(a) as a1, random_udf(b) 
as b2, c FROM MyTable) t WHERE c > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b2=[$1])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+   +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[random_udf(a) AS a1, random_udf(b) AS b2], where=[(CAST(c AS 
BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithBottomMultiNonDeterministicInConditionExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM (SELECT random_udf(a) as a1, random_udf(b) as b2, 
c FROM MyTable) t WHERE a1 > b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, $1)])
+   +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[c], where=[(random_udf(a) > random_udf(b))])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+    <Resource name="sql">
+      <![CDATA[SELECT random_udf(a1) as a2 FROM (SELECT random_udf(a) as a1, b 
FROM MyTable) t WHERE b > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a2=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a1=[random_udf($0)], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithSameDigest">
+    <Resource name="sql">
+      <![CDATA[SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b 
= a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[=($1, $0)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2])
+      +- LogicalFilter(condition=[=($0, $1)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b], where=[(a = b)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(a) AS a1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
index d16146703c9..5dab40a7300 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/CalcTest.xml
@@ -30,6 +30,45 @@ 
LogicalProject(EXPR$0=[ARRAY(_UTF-16LE'Hi':VARCHAR(2147483647) CHARACTER SET "UT
       <![CDATA[
 Calc(select=[ARRAY('Hi', 'Hello', c) AS EXPR$0])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(b) AS a1])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) 
> 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[>(random_udf($1), 10)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a], where=[>(random_udf(b), 10)])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
new file mode 100644
index 00000000000..b98694fb803
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcMergeTest.xml
@@ -0,0 +1,173 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCalcMergeWithTopMultiNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT random_udf(a1) as a2, random_udf(a1) as a3 FROM (SELECT 
random_udf(a) as a1, b FROM MyTable) t WHERE b > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a2=[random_udf($0)], a3=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a1=[random_udf($0)], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2, random_udf(random_udf(a)) AS 
a3], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeTopFilterHasNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(b) as b1, c FROM 
MyTable) t WHERE b1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], b1=[random_udf($1)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, c], where=[(random_udf(b) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithBottomMultiNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a1, b2 FROM (SELECT random_udf(a) as a1, random_udf(b) 
as b2, c FROM MyTable) t WHERE c > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b2=[$1])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+   +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[random_udf(a) AS a1, random_udf(b) AS b2], where=[(CAST(c AS 
BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithBottomMultiNonDeterministicInConditionExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT c FROM (SELECT random_udf(a) as a1, random_udf(b) as b2, 
c FROM MyTable) t WHERE a1 > b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(c=[$2])
++- LogicalFilter(condition=[>($0, $1)])
+   +- LogicalProject(a1=[random_udf($0)], b2=[random_udf($1)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[c], where=[(random_udf(a) > random_udf(b))])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+    <Resource name="sql">
+      <![CDATA[SELECT random_udf(a1) as a2 FROM (SELECT random_udf(a) as a1, b 
FROM MyTable) t WHERE b > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a2=[random_udf($0)])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a1=[random_udf($0)], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[random_udf(random_udf(a)) AS a2], where=[(b > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithoutInnerNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, c FROM (SELECT a, random_udf(a) as a1, c FROM 
MyTable) t WHERE c > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalFilter(condition=[>(CAST($2):BIGINT, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, c], where=[(CAST(c AS BIGINT) > 10)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, c], metadata=[]]], fields=[a, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithSameDigest">
+    <Resource name="sql">
+      <![CDATA[SELECT a, b FROM (SELECT * FROM MyTable WHERE a = b) t WHERE b 
= a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[=($1, $0)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2])
+      +- LogicalFilter(condition=[=($0, $1)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b], where=[(a = b)])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
filter=[], project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr1">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(a) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(a) AS a1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
project=[a], metadata=[]]], fields=[a])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index b40056000f4..51450c5fde4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -30,6 +30,45 @@ 
LogicalProject(EXPR$0=[ARRAY(_UTF-16LE'Hi':VARCHAR(2147483647) CHARACTER SET "UT
       <![CDATA[
 Calc(select=[ARRAY('Hi', 'Hello', c) AS EXPR$0])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr">
+    <Resource name="sql">
+      <![CDATA[SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM MyTable) 
t WHERE a1 > 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], a1=[$1])
++- LogicalFilter(condition=[>($1, 10)])
+   +- LogicalProject(a=[$0], a1=[random_udf($1)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, a1], where=[(a1 > 10)])
++- Calc(select=[a, random_udf(b) AS a1])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCalcMergeWithNonDeterministicExpr2">
+    <Resource name="sql">
+      <![CDATA[SELECT a FROM (SELECT a, b FROM MyTable) t WHERE random_udf(b) 
> 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[>(random_udf($1), 10)])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a], where=[>(random_udf(b), 10)])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
index d0fdce135f1..09a13014aff 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/CalcTest.scala
@@ -22,16 +22,22 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.plan.utils.MyPojo
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
 import org.apache.flink.table.planner.utils.TableTestBase
 
-import org.junit.Test
+import org.junit.{Before, Test}
 
 import java.sql.{Date, Time, Timestamp}
 
 class CalcTest extends TableTestBase {
 
   private val util = batchTestUtil()
-  util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("random_udf", new NonDeterministicUdf)
+  }
 
   @Test
   def testOnlyProject(): Unit = {
@@ -187,4 +193,16 @@ class CalcTest extends TableTestBase {
   def testDecimalMapWithDifferentPrecision(): Unit = {
     util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]")
   }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr(): Unit = {
+    val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM 
MyTable) t WHERE a1 > 10"
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr2(): Unit = {
+    val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE 
random_udf(b) > 10"
+    util.verifyRelPlan(sqlQuery)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
index 617a7bdc3f9..3e74c00480f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
@@ -22,15 +22,21 @@ import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.plan.utils.MyPojo
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.NonDeterministicUdf
 import org.apache.flink.table.planner.utils.TableTestBase
 
-import org.junit.Test
+import org.junit.{Before, Test}
 
 import java.sql.{Date, Time, Timestamp}
 
 class CalcTest extends TableTestBase {
   private val util = streamTestUtil()
-  util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("random_udf", new NonDeterministicUdf)
+  }
 
   @Test
   def testOnlyProject(): Unit = {
@@ -181,4 +187,16 @@ class CalcTest extends TableTestBase {
   def testDecimalMapWithDifferentPrecision(): Unit = {
     util.verifyExecPlan("SELECT MAP['a', 0.12, 'b', 0.5]")
   }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr(): Unit = {
+    val sqlQuery = "SELECT a, a1 FROM (SELECT a, random_udf(b) AS a1 FROM 
MyTable) t WHERE a1 > 10"
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testCalcMergeWithNonDeterministicExpr2(): Unit = {
+    val sqlQuery = "SELECT a FROM (SELECT a, b FROM MyTable) t WHERE 
random_udf(b) > 10"
+    util.verifyRelPlan(sqlQuery)
+  }
 }


Reply via email to