This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ea53e4a  [FLINK-24318][table-planner-blink] Casting a number to 
boolean has different results between 'select' fields and 'where' condition
ea53e4a is described below

commit ea53e4a6cd1d39592cf3a6ae911fed332234d00e
Author: xuyang <[email protected]>
AuthorDate: Fri Sep 17 20:11:33 2021 +0800

    [FLINK-24318][table-planner-blink] Casting a number to boolean has 
different results between 'select' fields and 'where' condition
    
    This closes #17311
    
    (cherry picked from commit 32f7cc9e34be67eaf1b746697f2fabefcd5f46c5)
---
 .../PushFilterInCalcIntoTableSourceScanRule.java   |  5 +-
 .../logical/PushFilterIntoTableSourceScanRule.java |  5 +-
 .../plan/rules/logical/FlinkCalcMergeRule.scala    |  5 +-
 .../JoinConditionEqualityTransferRule.scala        |  5 +-
 .../logical/JoinConditionTypeCoerceRule.scala      |  5 +-
 .../JoinDependentConditionDerivationRule.scala     |  3 +-
 .../PushFilterIntoLegacyTableSourceScanRule.scala  |  6 +-
 .../logical/SimplifyFilterConditionRule.scala      |  5 +-
 .../rules/logical/SimplifyJoinConditionRule.scala  |  5 +-
 .../table/planner/plan/utils/FlinkRexUtil.scala    |  4 +-
 .../logical/SimplifyFilterConditionRuleTest.xml    | 38 +++++++++
 .../logical/SimplifyJoinConditionRuleTest.xml      | 42 ++++++++++
 .../logical/SimplifyFilterConditionRuleTest.scala  | 12 +++
 .../logical/SimplifyJoinConditionRuleTest.scala    | 12 +++
 .../planner/plan/utils/FlinkRexUtilTest.scala      | 90 ++++++++++++++++------
 .../planner/runtime/stream/sql/CalcITCase.scala    | 81 ++++++++++++++++++-
 16 files changed, 287 insertions(+), 36 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
index 8bd61ba..33d24fa 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
@@ -122,7 +122,10 @@ public class PushFilterInCalcIntoTableSourceScanRule 
extends PushFilterIntoSourc
                     createRemainingCondition(
                             relBuilder, result.getRemainingFilters(), 
unconvertedPredicates);
             RexNode simplifiedRemainingCondition =
-                    FlinkRexUtil.simplify(relBuilder.getRexBuilder(), 
remainingCondition);
+                    FlinkRexUtil.simplify(
+                            relBuilder.getRexBuilder(),
+                            remainingCondition,
+                            calc.getCluster().getPlanner().getExecutor());
             programBuilder.addCondition(simplifiedRemainingCondition);
         }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 7046c9e..516f00c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -110,7 +110,10 @@ public class PushFilterIntoTableSourceScanRule extends 
PushFilterIntoSourceScanR
                     createRemainingCondition(
                             relBuilder, result.getRemainingFilters(), 
unconvertedPredicates);
             RexNode simplifiedRemainingCondition =
-                    FlinkRexUtil.simplify(relBuilder.getRexBuilder(), 
remainingCondition);
+                    FlinkRexUtil.simplify(
+                            relBuilder.getRexBuilder(),
+                            remainingCondition,
+                            filter.getCluster().getPlanner().getExecutor());
             Filter newFilter =
                     filter.copy(filter.getTraitSet(), newScan, 
simplifiedRemainingCondition);
             call.transformTo(newFilter);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
index fd068dc..3a3cd89 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
@@ -120,7 +120,10 @@ class FlinkCalcMergeRule[C <: Calc](calcClass: Class[C]) 
extends RelOptRule(
 
     val newMergedProgram = if (mergedProgram.getCondition != null) {
       val condition = mergedProgram.expandLocalRef(mergedProgram.getCondition)
-      val simplifiedCondition = FlinkRexUtil.simplify(rexBuilder, condition)
+      val simplifiedCondition = FlinkRexUtil.simplify(
+        rexBuilder,
+        condition,
+        topCalc.getCluster.getPlanner.getExecutor)
       if (simplifiedCondition.equals(condition)) {
         mergedProgram
       } else {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
index 178dd8d..e72e177 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
@@ -92,7 +92,10 @@ class JoinConditionEqualityTransferRule extends RelOptRule(
     }
 
     val newJoinFilter = builder.and(remainFilters :+
-      FlinkRexUtil.simplify(rexBuilder, builder.and(newEquiJoinFilters)))
+      FlinkRexUtil.simplify(
+        rexBuilder,
+        builder.and(newEquiJoinFilters),
+        join.getCluster.getPlanner.getExecutor))
     val newJoin = join.copy(
       join.getTraitSet,
       newJoinFilter,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
index 489091f..ff00dd2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
@@ -87,7 +87,10 @@ class JoinConditionTypeCoerceRule extends RelOptRule(
     }
 
     val newCondExp = builder.and(
-      FlinkRexUtil.simplify(rexBuilder, builder.and(newJoinFilters)))
+      FlinkRexUtil.simplify(
+        rexBuilder,
+        builder.and(newJoinFilters),
+        join.getCluster.getPlanner.getExecutor))
 
     val newJoin = join.copy(
       join.getTraitSet,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
index 1106599..efe9445 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
@@ -114,7 +114,8 @@ class JoinDependentConditionDerivationRule
     if (additionalConditions.nonEmpty) {
       val newCondExp = FlinkRexUtil.simplify(
         builder.getRexBuilder,
-        builder.and(conjunctions ++ additionalConditions))
+        builder.and(conjunctions ++ additionalConditions),
+        join.getCluster.getPlanner.getExecutor)
 
       if (!newCondExp.equals(join.getCondition)) {
         val newJoin = join.copy(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
index 4a1a453..b1b5270 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
@@ -127,8 +127,10 @@ class PushFilterIntoLegacyTableSourceScanRule extends 
RelOptRule(
       val converter = new ExpressionConverter(relBuilder)
       val remainingConditions = remainingPredicates.map(_.accept(converter)) 
++ unconvertedRexNodes
       val remainingCondition = remainingConditions.reduce((l, r) => 
relBuilder.and(l, r))
-      val simplifiedRemainingCondition =
-        FlinkRexUtil.simplify(relBuilder.getRexBuilder, remainingCondition)
+      val simplifiedRemainingCondition = FlinkRexUtil.simplify(
+        relBuilder.getRexBuilder,
+        remainingCondition,
+        filter.getCluster.getPlanner.getExecutor)
       val newFilter = filter.copy(filter.getTraitSet, newScan, 
simplifiedRemainingCondition)
       call.transformTo(newFilter)
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
index 1259c2c..67dd8c7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
@@ -60,7 +60,10 @@ class SimplifyFilterConditionRule(
     }
 
     val rexBuilder = filter.getCluster.getRexBuilder
-    val simplifiedCondition = FlinkRexUtil.simplify(rexBuilder, condition)
+    val simplifiedCondition = FlinkRexUtil.simplify(
+      rexBuilder,
+      condition,
+      filter.getCluster.getPlanner.getExecutor)
     val newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition)
 
     if (!changed.head && !condition.equals(newCondition)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
index 005edf6..46f9156 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
@@ -45,7 +45,10 @@ class SimplifyJoinConditionRule
       return
     }
 
-    val simpleCondExp = FlinkRexUtil.simplify(join.getCluster.getRexBuilder, 
condition)
+    val simpleCondExp = FlinkRexUtil.simplify(
+      join.getCluster.getRexBuilder,
+      condition,
+      join.getCluster.getPlanner.getExecutor)
     val newCondExp = RexUtil.pullFactors(join.getCluster.getRexBuilder, 
simpleCondExp)
 
     if (newCondExp.equals(condition)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 8c8dba2..2d95459 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -207,7 +207,7 @@ object FlinkRexUtil {
     * 5. a = a, a >= a, a <= a -> true
     * 6. a <> a, a > a, a < a -> false
     */
-  def simplify(rexBuilder: RexBuilder, expr: RexNode): RexNode = {
+  def simplify(rexBuilder: RexBuilder, expr: RexNode, executor: RexExecutor): 
RexNode = {
     if (expr.isAlwaysTrue || expr.isAlwaysFalse) {
       return expr
     }
@@ -219,7 +219,7 @@ object FlinkRexUtil {
     val binaryComparisonExprReduced = sameExprMerged.accept(
       new BinaryComparisonExprReducer(rexBuilder))
 
-    val rexSimplify = new RexSimplify(rexBuilder, RelOptPredicateList.EMPTY, 
true, RexUtil.EXECUTOR)
+    val rexSimplify = new RexSimplify(rexBuilder, RelOptPredicateList.EMPTY, 
true, executor)
     rexSimplify.simplify(binaryComparisonExprReduced)
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
index f670794..e5bf98c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
@@ -140,6 +140,44 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSimpleConditionWithCastToFalse">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE CAST(0 AS BOOLEAN)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[CAST(0):BOOLEAN NOT NULL])
+   +- LogicalTableScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[false])
+   +- LogicalTableScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimpleConditionWithCastToTrue">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM x WHERE CAST(200 AS BOOLEAN)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[CAST(200):BOOLEAN NOT NULL])
+   +- LogicalTableScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[true])
+   +- LogicalTableScan(table=[[default_catalog, default_database, x, source: 
[TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testSimplifyConditionInSubQuery1">
     <Resource name="sql">
       <![CDATA[SELECT * FROM x WHERE EXISTS (SELECT * FROM y WHERE (d = 1 AND 
e = 2) OR (NOT (d <> 1) AND e = 3)) AND true]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
index fa2283c..5b5251e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
@@ -72,4 +72,46 @@ LogicalProject(d=[$3])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSimplifyJoinConditionWithCastToTrue">
+    <Resource name="sql">
+      <![CDATA[SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(1.1 AS BOOLEAN)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimplifyJoinConditionWithCastToFalse">
+    <Resource name="sql">
+      <![CDATA[SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(0.0 AS BOOLEAN)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[false], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[false], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(d, e, f)]]])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
index 9e52d46..9e8a1a4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
@@ -58,6 +58,18 @@ class SimplifyFilterConditionRuleTest extends TableTestBase {
   }
 
   @Test
+  def testSimpleConditionWithCastToTrue(): Unit = {
+    util.verifyRelPlan(
+      "SELECT * FROM x WHERE CAST(200 AS BOOLEAN)")
+  }
+
+  @Test
+  def testSimpleConditionWithCastToFalse(): Unit = {
+    util.verifyRelPlan(
+      "SELECT * FROM x WHERE CAST(0 AS BOOLEAN)")
+  }
+
+  @Test
   def testSimplifyConditionInSubQuery1(): Unit = {
     val sqlQuery = "SELECT * FROM x WHERE EXISTS " +
       "(SELECT * FROM y WHERE (d = 1 AND e = 2) OR (NOT (d <> 1) AND e = 3)) 
AND true"
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
index 552c399..6e5d5ae 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
@@ -65,4 +65,16 @@ class SimplifyJoinConditionRuleTest extends TableTestBase {
       """.stripMargin
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testSimplifyJoinConditionWithCastToTrue(): Unit = {
+    val sqlQuery = "SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(1.1 AS 
BOOLEAN)"
+    util.verifyRelPlan(sqlQuery)
+  }
+
+  @Test
+  def testSimplifyJoinConditionWithCastToFalse(): Unit = {
+    val sqlQuery = "SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(0.0 AS 
BOOLEAN)"
+    util.verifyRelPlan(sqlQuery)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
index 9ca7ecb..ee3e44e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
@@ -17,10 +17,13 @@
  */
 package org.apache.flink.table.planner.plan.utils
 
+import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.planner.calcite.{FlinkRexBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.planner.codegen.ExpressionReducer
 
-import org.apache.calcite.rex.{RexLiteral, RexNode, RexUtil}
+import org.apache.calcite.rex.{RexBuilder, RexLiteral, RexNode, RexUtil}
 import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.junit.Assert.{assertEquals, assertFalse}
 import org.junit.Test
@@ -240,7 +243,7 @@ class FlinkRexUtilTest {
       rexBuilder.makeCall(EQUALS, a, b),
       rexBuilder.makeCall(EQUALS, a, b)
     )
-    val newPredicate0 = FlinkRexUtil.simplify(rexBuilder, predicate0)
+    val newPredicate0 = simplify(rexBuilder, predicate0)
     assertEquals(rexBuilder.makeCall(EQUALS, a, b).toString, 
newPredicate0.toString)
 
     // a = b AND b = a
@@ -248,7 +251,7 @@ class FlinkRexUtilTest {
       rexBuilder.makeCall(EQUALS, a, b),
       rexBuilder.makeCall(EQUALS, b, a)
     )
-    val newPredicate1 = FlinkRexUtil.simplify(rexBuilder, predicate1)
+    val newPredicate1 = simplify(rexBuilder, predicate1)
     assertEquals(rexBuilder.makeCall(EQUALS, a, b).toString, 
newPredicate1.toString)
 
     // a = b OR b = a
@@ -256,7 +259,7 @@ class FlinkRexUtilTest {
       rexBuilder.makeCall(EQUALS, a, b),
       rexBuilder.makeCall(EQUALS, b, a)
     )
-    val newPredicate2 = FlinkRexUtil.simplify(rexBuilder, predicate2)
+    val newPredicate2 = simplify(rexBuilder, predicate2)
     assertEquals(rexBuilder.makeCall(EQUALS, a, b).toString, 
newPredicate2.toString)
 
     // a = b AND c < d AND b = a AND d > c
@@ -266,7 +269,7 @@ class FlinkRexUtilTest {
       rexBuilder.makeCall(EQUALS, b, a),
       rexBuilder.makeCall(GREATER_THAN, d, c)
     )
-    val newPredicate3 = FlinkRexUtil.simplify(rexBuilder, predicate3)
+    val newPredicate3 = simplify(rexBuilder, predicate3)
     assertEquals(rexBuilder.makeCall(AND,
       rexBuilder.makeCall(EQUALS, a, b),
       rexBuilder.makeCall(LESS_THAN, c, d)).toString,
@@ -279,7 +282,7 @@ class FlinkRexUtilTest {
       rexBuilder.makeCall(LESS_THAN_OR_EQUAL,
         c, rexBuilder.makeCast(typeFactory.createSqlType(INTEGER), a))
     )
-    val newPredicate4 = FlinkRexUtil.simplify(rexBuilder, predicate4)
+    val newPredicate4 = simplify(rexBuilder, predicate4)
     assertEquals(rexBuilder.makeCall(GREATER_THAN_OR_EQUAL,
       rexBuilder.makeCast(typeFactory.createSqlType(INTEGER), a), c).toString,
       newPredicate4.toString)
@@ -301,7 +304,7 @@ class FlinkRexUtilTest {
         rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, dPlus1, c),
         rexBuilder.makeCall(LESS_THAN_OR_EQUAL, c, dPlus1))
     )
-    val newPredicate5 = FlinkRexUtil.simplify(rexBuilder, predicate5)
+    val newPredicate5 = simplify(rexBuilder, predicate5)
     assertEquals(rexBuilder.makeCall(OR,
       rexBuilder.makeCall(EQUALS, aSubstring13, b),
       rexBuilder.makeCall(LESS_THAN_OR_EQUAL, c, dPlus1)).toString,
@@ -320,7 +323,7 @@ class FlinkRexUtilTest {
         rexBuilder.makeCall(LESS_THAN, b, rexBuilder.makeLiteral("k"))
       )
     )
-    val newPredicate6 = FlinkRexUtil.simplify(rexBuilder, predicate6)
+    val newPredicate6 = simplify(rexBuilder, predicate6)
     assertEquals(rexBuilder.makeCall(AND,
       rexBuilder.makeCall(OR,
         rexBuilder.makeCall(EQUALS, a, b),
@@ -342,7 +345,7 @@ class FlinkRexUtilTest {
       ),
       rexBuilder.makeCall(EQUALS, b, a)
     )
-    val newPredicate7 = FlinkRexUtil.simplify(rexBuilder, predicate7)
+    val newPredicate7 = simplify(rexBuilder, predicate7)
     assertEquals(rexBuilder.makeCall(AND,
       rexBuilder.makeCall(EQUALS, a, b),
       rexBuilder.makeCall(LESS_THAN, c, d)).toString,
@@ -356,7 +359,7 @@ class FlinkRexUtilTest {
         rexBuilder.makeCall(EQUALS, c, d)
       )
     )
-    val newPredicate8 = FlinkRexUtil.simplify(rexBuilder, predicate8)
+    val newPredicate8 = simplify(rexBuilder, predicate8)
     assertEquals(rexBuilder.makeCall(OR,
       rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, b, a),
       rexBuilder.makeCall(EQUALS, c, d)).toString,
@@ -365,43 +368,43 @@ class FlinkRexUtilTest {
     // true AND true
     val predicate9 = rexBuilder.makeCall(AND,
       rexBuilder.makeLiteral(true), rexBuilder.makeLiteral(true))
-    val newPredicate9 = FlinkRexUtil.simplify(rexBuilder, predicate9)
+    val newPredicate9 = simplify(rexBuilder, predicate9)
     assertEquals(rexBuilder.makeLiteral(true).toString, newPredicate9.toString)
 
     // false OR false
     val predicate10 = rexBuilder.makeCall(OR,
       rexBuilder.makeLiteral(false), rexBuilder.makeLiteral(false))
-    val newPredicate10 = FlinkRexUtil.simplify(rexBuilder, predicate10)
+    val newPredicate10 = simplify(rexBuilder, predicate10)
     assertEquals(rexBuilder.makeLiteral(false).toString, 
newPredicate10.toString)
 
     // a = a
     val predicate11 = rexBuilder.makeCall(EQUALS, a, a)
-    val newPredicate11 = FlinkRexUtil.simplify(rexBuilder, predicate11)
+    val newPredicate11 = simplify(rexBuilder, predicate11)
     assertEquals(rexBuilder.makeLiteral(true).toString, 
newPredicate11.toString)
 
     // a >= a
     val predicate12 = rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, a, a)
-    val newPredicate12 = FlinkRexUtil.simplify(rexBuilder, predicate12)
+    val newPredicate12 = simplify(rexBuilder, predicate12)
     assertEquals(rexBuilder.makeLiteral(true).toString, 
newPredicate12.toString)
 
     // a <= a
     val predicate13 = rexBuilder.makeCall(LESS_THAN_OR_EQUAL, a, a)
-    val newPredicate13 = FlinkRexUtil.simplify(rexBuilder, predicate13)
+    val newPredicate13 = simplify(rexBuilder, predicate13)
     assertEquals(rexBuilder.makeLiteral(true).toString, 
newPredicate13.toString)
 
     // a <> a
     val predicate14 = rexBuilder.makeCall(NOT_EQUALS, a, a)
-    val newPredicate14 = FlinkRexUtil.simplify(rexBuilder, predicate14)
+    val newPredicate14 = simplify(rexBuilder, predicate14)
     assertEquals(rexBuilder.makeLiteral(false).toString, 
newPredicate14.toString)
 
     // a > a
     val predicate15 = rexBuilder.makeCall(GREATER_THAN, a, a)
-    val newPredicate15 = FlinkRexUtil.simplify(rexBuilder, predicate15)
+    val newPredicate15 = simplify(rexBuilder, predicate15)
     assertEquals(rexBuilder.makeLiteral(false).toString, 
newPredicate15.toString)
 
     // a < a
     val predicate16 = rexBuilder.makeCall(LESS_THAN, a, a)
-    val newPredicate16 = FlinkRexUtil.simplify(rexBuilder, predicate16)
+    val newPredicate16 = simplify(rexBuilder, predicate16)
     assertEquals(rexBuilder.makeLiteral(false).toString, 
newPredicate16.toString)
 
     // c = 0 AND SEARCH(c, [0, 1])
@@ -412,7 +415,7 @@ class FlinkRexUtilTest {
       rexBuilder.makeIn(c, java.util.Arrays.asList(
         intLiteral(0),
         intLiteral(1))))
-    val newPredicate17 = FlinkRexUtil.simplify(rexBuilder, predicate17)
+    val newPredicate17 = simplify(rexBuilder, predicate17)
     assertEquals(
       rexBuilder.makeIn(c, 
Collections.singletonList[RexNode](intLiteral(0))).toString,
       newPredicate17.toString)
@@ -425,7 +428,7 @@ class FlinkRexUtilTest {
       OR,
       rexBuilder.makeCall(EQUALS, c, intLiteral(0)),
       predicate18Search)
-    val newPredicate18 = FlinkRexUtil.simplify(rexBuilder, predicate18)
+    val newPredicate18 = simplify(rexBuilder, predicate18)
     assertEquals(predicate18Search.toString, newPredicate18.toString)
 
     // c > 0 AND (
@@ -447,7 +450,7 @@ class FlinkRexUtilTest {
       AND,
       rexBuilder.makeCall(GREATER_THAN, c, intLiteral(0)),
       predicate19Layer1)
-    val newPredicate19 = FlinkRexUtil.simplify(rexBuilder, predicate19)
+    val newPredicate19 = simplify(rexBuilder, predicate19)
     assertEquals(
       rexBuilder.makeIn(c, 
Collections.singletonList[RexNode](intLiteral(1))).toString,
       newPredicate19.toString)
@@ -458,9 +461,52 @@ class FlinkRexUtilTest {
       OR,
       rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, c, intLiteral(0)),
       predicate18Search)
-    val newPredicate20 = FlinkRexUtil.simplify(rexBuilder, predicate20)
+    val newPredicate20 = simplify(rexBuilder, predicate20)
     assertEquals(predicate20.toString, newPredicate20.toString)
+
+    //CAST(1 AS BOOLEAN)
+    val predicate21CastFromData = intLiteral(1)
+    val predicate21Cast = makeToBooleanCast(predicate21CastFromData)
+    val newPredicate21 = simplify(rexBuilder, predicate21Cast)
+    assertEquals(rexBuilder.makeLiteral(true).toString, 
newPredicate21.toString)
+
+    //CAST(0 AS BOOLEAN)
+    val predicate22CastFromData = intLiteral(0)
+    val predicate22Cast = makeToBooleanCast(predicate22CastFromData)
+    val newPredicate22 = simplify(rexBuilder, predicate22Cast)
+    assertEquals(rexBuilder.makeLiteral(false).toString, 
newPredicate22.toString)
+
+    //CAST(-1 AS BOOLEAN)
+    val predicate23CastFromData = intLiteral(-1)
+    val predicate23Cast = makeToBooleanCast(predicate23CastFromData)
+    val newPredicate23 = simplify(rexBuilder, predicate23Cast)
+    assertEquals(rexBuilder.makeLiteral(true).toString, 
newPredicate23.toString)
+
+    //CAST(1.1 AS BOOLEAN)
+    val predicate24CastFromData = 
rexBuilder.makeExactLiteral(BigDecimal.valueOf(1.1))
+    val predicate24Cast = makeToBooleanCast(predicate24CastFromData)
+    val newPredicate24 = simplify(rexBuilder, predicate24Cast)
+    assertEquals(rexBuilder.makeLiteral(true).toString, 
newPredicate24.toString)
+
+    //CAST(0.000 AS BOOLEAN)
+    val predicate25CastFromData = 
rexBuilder.makeExactLiteral(BigDecimal.valueOf(0.000))
+    val predicate25Cast = makeToBooleanCast(predicate25CastFromData)
+    val newPredicate25 = simplify(rexBuilder, predicate25Cast)
+    assertEquals(rexBuilder.makeLiteral(false).toString, 
newPredicate25.toString)
   }
 
   def intLiteral(x: Int): RexLiteral = 
rexBuilder.makeExactLiteral(BigDecimal.valueOf(x))
+
+  def simplify(rexBuilder: RexBuilder, expr: RexNode): RexNode ={
+    val expressionReducer = new ExpressionReducer(TableConfig.getDefault, 
false)
+    FlinkRexUtil.simplify(rexBuilder, expr, expressionReducer)
+  }
+
+  private def makeToBooleanCast(fromData: RexNode): RexNode ={
+    val booleanType = new BasicSqlType(typeFactory.getTypeSystem, 
SqlTypeName.BOOLEAN)
+    rexBuilder.makeCall(
+      booleanType,
+      CAST,
+      Collections.singletonList(fromData.asInstanceOf[RexNode]))
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 723800f..794a135 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -31,14 +31,17 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import 
org.apache.flink.table.runtime.typeutils.MapDataSerializerTest.CustomMapData
-import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.table.types.logical.{BigIntType, BooleanType, IntType, 
VarCharType}
 import org.apache.flink.table.utils.LegacyRowResource
 import org.apache.flink.types.Row
 import org.apache.flink.util.CollectionUtil
 
-import java.util
 import org.junit.Assert._
 import org.junit._
+
+import java.time.Instant
+import java.util
+
 import scala.collection.JavaConversions._
 import scala.collection.Seq
 
@@ -48,6 +51,80 @@ class CalcITCase extends StreamingTestBase {
   def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
 
   @Test
+  def testCastNumericToBooleanInProjection(): Unit ={
+    val sqlQuery =
+      "SELECT CAST(1 AS BOOLEAN), CAST(0 AS BOOLEAN), CAST(1.1 AS BOOLEAN), 
CAST(0.00 AS BOOLEAN)"
+
+    val outputType = InternalTypeInfo.ofFields(
+      new BooleanType(),
+      new BooleanType(),
+      new BooleanType(),
+      new BooleanType()
+    )
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
+    val sink = new TestingAppendRowDataSink(outputType)
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "+I(true,false,true,false)"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testCastNumericToBooleanInCondition(): Unit ={
+    val sqlQuery =
+      s"""
+         | SELECT * FROM MyTableRow WHERE b = CAST(1 AS BOOLEAN)
+         | UNION ALL
+         | SELECT * FROM MyTableRow WHERE b = CAST(0 AS BOOLEAN)
+         | UNION ALL
+         | SELECT * FROM MyTableRow WHERE b = CAST(1.1 AS BOOLEAN)
+         | UNION ALL
+         | SELECT * FROM MyTableRow WHERE b = CAST(0.0 AS BOOLEAN)
+         |""".stripMargin
+
+    val rowData1: GenericRowData = new GenericRowData(2)
+    rowData1.setField(0, 1)
+    rowData1.setField(1, true)
+
+    val rowData2: GenericRowData = new GenericRowData(2)
+    rowData2.setField(0, 2)
+    rowData2.setField(1, false)
+
+    val data = List(rowData1,rowData2)
+
+    implicit val dataType: TypeInformation[GenericRowData] =
+      InternalTypeInfo.ofFields(
+        new IntType(),
+        new BooleanType()).asInstanceOf[TypeInformation[GenericRowData]]
+
+    val ds = env.fromCollection(data)
+
+    val t = ds.toTable(tEnv, 'a, 'b)
+    tEnv.registerTable("MyTableRow", t)
+
+    val outputType = InternalTypeInfo.ofFields(
+      new IntType(),
+      new BooleanType())
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
+    val sink = new TestingAppendRowDataSink(outputType)
+    result.addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "+I(1,true)",
+      "+I(2,false)",
+      "+I(1,true)",
+      "+I(2,false)"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
   def testGenericRowAndRowData(): Unit = {
     val sqlQuery = "SELECT * FROM MyTableRow"
 

Reply via email to