This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new ea53e4a [FLINK-24318][table-planner-blink] Casting a number to
boolean has different results between 'select' fields and 'where' condition
ea53e4a is described below
commit ea53e4a6cd1d39592cf3a6ae911fed332234d00e
Author: xuyang <[email protected]>
AuthorDate: Fri Sep 17 20:11:33 2021 +0800
[FLINK-24318][table-planner-blink] Casting a number to boolean has
different results between 'select' fields and 'where' condition
This closes #17311
(cherry picked from commit 32f7cc9e34be67eaf1b746697f2fabefcd5f46c5)
---
.../PushFilterInCalcIntoTableSourceScanRule.java | 5 +-
.../logical/PushFilterIntoTableSourceScanRule.java | 5 +-
.../plan/rules/logical/FlinkCalcMergeRule.scala | 5 +-
.../JoinConditionEqualityTransferRule.scala | 5 +-
.../logical/JoinConditionTypeCoerceRule.scala | 5 +-
.../JoinDependentConditionDerivationRule.scala | 3 +-
.../PushFilterIntoLegacyTableSourceScanRule.scala | 6 +-
.../logical/SimplifyFilterConditionRule.scala | 5 +-
.../rules/logical/SimplifyJoinConditionRule.scala | 5 +-
.../table/planner/plan/utils/FlinkRexUtil.scala | 4 +-
.../logical/SimplifyFilterConditionRuleTest.xml | 38 +++++++++
.../logical/SimplifyJoinConditionRuleTest.xml | 42 ++++++++++
.../logical/SimplifyFilterConditionRuleTest.scala | 12 +++
.../logical/SimplifyJoinConditionRuleTest.scala | 12 +++
.../planner/plan/utils/FlinkRexUtilTest.scala | 90 ++++++++++++++++------
.../planner/runtime/stream/sql/CalcITCase.scala | 81 ++++++++++++++++++-
16 files changed, 287 insertions(+), 36 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
index 8bd61ba..33d24fa 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceScanRule.java
@@ -122,7 +122,10 @@ public class PushFilterInCalcIntoTableSourceScanRule
extends PushFilterIntoSourc
createRemainingCondition(
relBuilder, result.getRemainingFilters(),
unconvertedPredicates);
RexNode simplifiedRemainingCondition =
- FlinkRexUtil.simplify(relBuilder.getRexBuilder(),
remainingCondition);
+ FlinkRexUtil.simplify(
+ relBuilder.getRexBuilder(),
+ remainingCondition,
+ calc.getCluster().getPlanner().getExecutor());
programBuilder.addCondition(simplifiedRemainingCondition);
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
index 7046c9e..516f00c 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.java
@@ -110,7 +110,10 @@ public class PushFilterIntoTableSourceScanRule extends
PushFilterIntoSourceScanR
createRemainingCondition(
relBuilder, result.getRemainingFilters(),
unconvertedPredicates);
RexNode simplifiedRemainingCondition =
- FlinkRexUtil.simplify(relBuilder.getRexBuilder(),
remainingCondition);
+ FlinkRexUtil.simplify(
+ relBuilder.getRexBuilder(),
+ remainingCondition,
+ filter.getCluster().getPlanner().getExecutor());
Filter newFilter =
filter.copy(filter.getTraitSet(), newScan,
simplifiedRemainingCondition);
call.transformTo(newFilter);
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
index fd068dc..3a3cd89 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
@@ -120,7 +120,10 @@ class FlinkCalcMergeRule[C <: Calc](calcClass: Class[C])
extends RelOptRule(
val newMergedProgram = if (mergedProgram.getCondition != null) {
val condition = mergedProgram.expandLocalRef(mergedProgram.getCondition)
- val simplifiedCondition = FlinkRexUtil.simplify(rexBuilder, condition)
+ val simplifiedCondition = FlinkRexUtil.simplify(
+ rexBuilder,
+ condition,
+ topCalc.getCluster.getPlanner.getExecutor)
if (simplifiedCondition.equals(condition)) {
mergedProgram
} else {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
index 178dd8d..e72e177 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionEqualityTransferRule.scala
@@ -92,7 +92,10 @@ class JoinConditionEqualityTransferRule extends RelOptRule(
}
val newJoinFilter = builder.and(remainFilters :+
- FlinkRexUtil.simplify(rexBuilder, builder.and(newEquiJoinFilters)))
+ FlinkRexUtil.simplify(
+ rexBuilder,
+ builder.and(newEquiJoinFilters),
+ join.getCluster.getPlanner.getExecutor))
val newJoin = join.copy(
join.getTraitSet,
newJoinFilter,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
index 489091f..ff00dd2 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinConditionTypeCoerceRule.scala
@@ -87,7 +87,10 @@ class JoinConditionTypeCoerceRule extends RelOptRule(
}
val newCondExp = builder.and(
- FlinkRexUtil.simplify(rexBuilder, builder.and(newJoinFilters)))
+ FlinkRexUtil.simplify(
+ rexBuilder,
+ builder.and(newJoinFilters),
+ join.getCluster.getPlanner.getExecutor))
val newJoin = join.copy(
join.getTraitSet,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
index 1106599..efe9445 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/JoinDependentConditionDerivationRule.scala
@@ -114,7 +114,8 @@ class JoinDependentConditionDerivationRule
if (additionalConditions.nonEmpty) {
val newCondExp = FlinkRexUtil.simplify(
builder.getRexBuilder,
- builder.and(conjunctions ++ additionalConditions))
+ builder.and(conjunctions ++ additionalConditions),
+ join.getCluster.getPlanner.getExecutor)
if (!newCondExp.equals(join.getCondition)) {
val newJoin = join.copy(
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
index 4a1a453..b1b5270 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.scala
@@ -127,8 +127,10 @@ class PushFilterIntoLegacyTableSourceScanRule extends
RelOptRule(
val converter = new ExpressionConverter(relBuilder)
val remainingConditions = remainingPredicates.map(_.accept(converter))
++ unconvertedRexNodes
val remainingCondition = remainingConditions.reduce((l, r) =>
relBuilder.and(l, r))
- val simplifiedRemainingCondition =
- FlinkRexUtil.simplify(relBuilder.getRexBuilder, remainingCondition)
+ val simplifiedRemainingCondition = FlinkRexUtil.simplify(
+ relBuilder.getRexBuilder,
+ remainingCondition,
+ filter.getCluster.getPlanner.getExecutor)
val newFilter = filter.copy(filter.getTraitSet, newScan,
simplifiedRemainingCondition)
call.transformTo(newFilter)
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
index 1259c2c..67dd8c7 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRule.scala
@@ -60,7 +60,10 @@ class SimplifyFilterConditionRule(
}
val rexBuilder = filter.getCluster.getRexBuilder
- val simplifiedCondition = FlinkRexUtil.simplify(rexBuilder, condition)
+ val simplifiedCondition = FlinkRexUtil.simplify(
+ rexBuilder,
+ condition,
+ filter.getCluster.getPlanner.getExecutor)
val newCondition = RexUtil.pullFactors(rexBuilder, simplifiedCondition)
if (!changed.head && !condition.equals(newCondition)) {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
index 005edf6..46f9156 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRule.scala
@@ -45,7 +45,10 @@ class SimplifyJoinConditionRule
return
}
- val simpleCondExp = FlinkRexUtil.simplify(join.getCluster.getRexBuilder,
condition)
+ val simpleCondExp = FlinkRexUtil.simplify(
+ join.getCluster.getRexBuilder,
+ condition,
+ join.getCluster.getPlanner.getExecutor)
val newCondExp = RexUtil.pullFactors(join.getCluster.getRexBuilder,
simpleCondExp)
if (newCondExp.equals(condition)) {
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 8c8dba2..2d95459 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -207,7 +207,7 @@ object FlinkRexUtil {
* 5. a = a, a >= a, a <= a -> true
* 6. a <> a, a > a, a < a -> false
*/
- def simplify(rexBuilder: RexBuilder, expr: RexNode): RexNode = {
+ def simplify(rexBuilder: RexBuilder, expr: RexNode, executor: RexExecutor):
RexNode = {
if (expr.isAlwaysTrue || expr.isAlwaysFalse) {
return expr
}
@@ -219,7 +219,7 @@ object FlinkRexUtil {
val binaryComparisonExprReduced = sameExprMerged.accept(
new BinaryComparisonExprReducer(rexBuilder))
- val rexSimplify = new RexSimplify(rexBuilder, RelOptPredicateList.EMPTY,
true, RexUtil.EXECUTOR)
+ val rexSimplify = new RexSimplify(rexBuilder, RelOptPredicateList.EMPTY,
true, executor)
rexSimplify.simplify(binaryComparisonExprReduced)
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
index f670794..e5bf98c 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.xml
@@ -140,6 +140,44 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
]]>
</Resource>
</TestCase>
+ <TestCase name="testSimpleConditionWithCastToFalse">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE CAST(0 AS BOOLEAN)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[CAST(0):BOOLEAN NOT NULL])
+ +- LogicalTableScan(table=[[default_catalog, default_database, x, source:
[TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[false])
+ +- LogicalTableScan(table=[[default_catalog, default_database, x, source:
[TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimpleConditionWithCastToTrue">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM x WHERE CAST(200 AS BOOLEAN)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[CAST(200):BOOLEAN NOT NULL])
+ +- LogicalTableScan(table=[[default_catalog, default_database, x, source:
[TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[true])
+ +- LogicalTableScan(table=[[default_catalog, default_database, x, source:
[TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testSimplifyConditionInSubQuery1">
<Resource name="sql">
<![CDATA[SELECT * FROM x WHERE EXISTS (SELECT * FROM y WHERE (d = 1 AND
e = 2) OR (NOT (d <> 1) AND e = 3)) AND true]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
index fa2283c..5b5251e 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.xml
@@ -72,4 +72,46 @@ LogicalProject(d=[$3])
]]>
</Resource>
</TestCase>
+ <TestCase name="testSimplifyJoinConditionWithCastToTrue">
+ <Resource name="sql">
+ <![CDATA[SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(1.1 AS BOOLEAN)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimplifyJoinConditionWithCastToFalse">
+ <Resource name="sql">
+ <![CDATA[SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(0.0 AS BOOLEAN)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[false], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(d=[$3])
++- LogicalJoin(condition=[false], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2,
source: [TestTableSource(d, e, f)]]])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
index 9e52d46..9e8a1a4 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyFilterConditionRuleTest.scala
@@ -58,6 +58,18 @@ class SimplifyFilterConditionRuleTest extends TableTestBase {
}
@Test
+ def testSimpleConditionWithCastToTrue(): Unit = {
+ util.verifyRelPlan(
+ "SELECT * FROM x WHERE CAST(200 AS BOOLEAN)")
+ }
+
+ @Test
+ def testSimpleConditionWithCastToFalse(): Unit = {
+ util.verifyRelPlan(
+ "SELECT * FROM x WHERE CAST(0 AS BOOLEAN)")
+ }
+
+ @Test
def testSimplifyConditionInSubQuery1(): Unit = {
val sqlQuery = "SELECT * FROM x WHERE EXISTS " +
"(SELECT * FROM y WHERE (d = 1 AND e = 2) OR (NOT (d <> 1) AND e = 3))
AND true"
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
index 552c399..6e5d5ae 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/SimplifyJoinConditionRuleTest.scala
@@ -65,4 +65,16 @@ class SimplifyJoinConditionRuleTest extends TableTestBase {
""".stripMargin
util.verifyRelPlan(sqlQuery)
}
+
+ @Test
+ def testSimplifyJoinConditionWithCastToTrue(): Unit = {
+ val sqlQuery = "SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(1.1 AS
BOOLEAN)"
+ util.verifyRelPlan(sqlQuery)
+ }
+
+ @Test
+ def testSimplifyJoinConditionWithCastToFalse(): Unit = {
+ val sqlQuery = "SELECT d FROM MyTable1 JOIN MyTable2 ON CAST(0.0 AS
BOOLEAN)"
+ util.verifyRelPlan(sqlQuery)
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
index 9ca7ecb..ee3e44e 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtilTest.scala
@@ -17,10 +17,13 @@
*/
package org.apache.flink.table.planner.plan.utils
+import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.planner.calcite.{FlinkRexBuilder,
FlinkTypeFactory, FlinkTypeSystem}
+import org.apache.flink.table.planner.codegen.ExpressionReducer
-import org.apache.calcite.rex.{RexLiteral, RexNode, RexUtil}
+import org.apache.calcite.rex.{RexBuilder, RexLiteral, RexNode, RexUtil}
import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.junit.Assert.{assertEquals, assertFalse}
import org.junit.Test
@@ -240,7 +243,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(EQUALS, a, b),
rexBuilder.makeCall(EQUALS, a, b)
)
- val newPredicate0 = FlinkRexUtil.simplify(rexBuilder, predicate0)
+ val newPredicate0 = simplify(rexBuilder, predicate0)
assertEquals(rexBuilder.makeCall(EQUALS, a, b).toString,
newPredicate0.toString)
// a = b AND b = a
@@ -248,7 +251,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(EQUALS, a, b),
rexBuilder.makeCall(EQUALS, b, a)
)
- val newPredicate1 = FlinkRexUtil.simplify(rexBuilder, predicate1)
+ val newPredicate1 = simplify(rexBuilder, predicate1)
assertEquals(rexBuilder.makeCall(EQUALS, a, b).toString,
newPredicate1.toString)
// a = b OR b = a
@@ -256,7 +259,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(EQUALS, a, b),
rexBuilder.makeCall(EQUALS, b, a)
)
- val newPredicate2 = FlinkRexUtil.simplify(rexBuilder, predicate2)
+ val newPredicate2 = simplify(rexBuilder, predicate2)
assertEquals(rexBuilder.makeCall(EQUALS, a, b).toString,
newPredicate2.toString)
// a = b AND c < d AND b = a AND d > c
@@ -266,7 +269,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(EQUALS, b, a),
rexBuilder.makeCall(GREATER_THAN, d, c)
)
- val newPredicate3 = FlinkRexUtil.simplify(rexBuilder, predicate3)
+ val newPredicate3 = simplify(rexBuilder, predicate3)
assertEquals(rexBuilder.makeCall(AND,
rexBuilder.makeCall(EQUALS, a, b),
rexBuilder.makeCall(LESS_THAN, c, d)).toString,
@@ -279,7 +282,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(LESS_THAN_OR_EQUAL,
c, rexBuilder.makeCast(typeFactory.createSqlType(INTEGER), a))
)
- val newPredicate4 = FlinkRexUtil.simplify(rexBuilder, predicate4)
+ val newPredicate4 = simplify(rexBuilder, predicate4)
assertEquals(rexBuilder.makeCall(GREATER_THAN_OR_EQUAL,
rexBuilder.makeCast(typeFactory.createSqlType(INTEGER), a), c).toString,
newPredicate4.toString)
@@ -301,7 +304,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, dPlus1, c),
rexBuilder.makeCall(LESS_THAN_OR_EQUAL, c, dPlus1))
)
- val newPredicate5 = FlinkRexUtil.simplify(rexBuilder, predicate5)
+ val newPredicate5 = simplify(rexBuilder, predicate5)
assertEquals(rexBuilder.makeCall(OR,
rexBuilder.makeCall(EQUALS, aSubstring13, b),
rexBuilder.makeCall(LESS_THAN_OR_EQUAL, c, dPlus1)).toString,
@@ -320,7 +323,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(LESS_THAN, b, rexBuilder.makeLiteral("k"))
)
)
- val newPredicate6 = FlinkRexUtil.simplify(rexBuilder, predicate6)
+ val newPredicate6 = simplify(rexBuilder, predicate6)
assertEquals(rexBuilder.makeCall(AND,
rexBuilder.makeCall(OR,
rexBuilder.makeCall(EQUALS, a, b),
@@ -342,7 +345,7 @@ class FlinkRexUtilTest {
),
rexBuilder.makeCall(EQUALS, b, a)
)
- val newPredicate7 = FlinkRexUtil.simplify(rexBuilder, predicate7)
+ val newPredicate7 = simplify(rexBuilder, predicate7)
assertEquals(rexBuilder.makeCall(AND,
rexBuilder.makeCall(EQUALS, a, b),
rexBuilder.makeCall(LESS_THAN, c, d)).toString,
@@ -356,7 +359,7 @@ class FlinkRexUtilTest {
rexBuilder.makeCall(EQUALS, c, d)
)
)
- val newPredicate8 = FlinkRexUtil.simplify(rexBuilder, predicate8)
+ val newPredicate8 = simplify(rexBuilder, predicate8)
assertEquals(rexBuilder.makeCall(OR,
rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, b, a),
rexBuilder.makeCall(EQUALS, c, d)).toString,
@@ -365,43 +368,43 @@ class FlinkRexUtilTest {
// true AND true
val predicate9 = rexBuilder.makeCall(AND,
rexBuilder.makeLiteral(true), rexBuilder.makeLiteral(true))
- val newPredicate9 = FlinkRexUtil.simplify(rexBuilder, predicate9)
+ val newPredicate9 = simplify(rexBuilder, predicate9)
assertEquals(rexBuilder.makeLiteral(true).toString, newPredicate9.toString)
// false OR false
val predicate10 = rexBuilder.makeCall(OR,
rexBuilder.makeLiteral(false), rexBuilder.makeLiteral(false))
- val newPredicate10 = FlinkRexUtil.simplify(rexBuilder, predicate10)
+ val newPredicate10 = simplify(rexBuilder, predicate10)
assertEquals(rexBuilder.makeLiteral(false).toString,
newPredicate10.toString)
// a = a
val predicate11 = rexBuilder.makeCall(EQUALS, a, a)
- val newPredicate11 = FlinkRexUtil.simplify(rexBuilder, predicate11)
+ val newPredicate11 = simplify(rexBuilder, predicate11)
assertEquals(rexBuilder.makeLiteral(true).toString,
newPredicate11.toString)
// a >= a
val predicate12 = rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, a, a)
- val newPredicate12 = FlinkRexUtil.simplify(rexBuilder, predicate12)
+ val newPredicate12 = simplify(rexBuilder, predicate12)
assertEquals(rexBuilder.makeLiteral(true).toString,
newPredicate12.toString)
// a <= a
val predicate13 = rexBuilder.makeCall(LESS_THAN_OR_EQUAL, a, a)
- val newPredicate13 = FlinkRexUtil.simplify(rexBuilder, predicate13)
+ val newPredicate13 = simplify(rexBuilder, predicate13)
assertEquals(rexBuilder.makeLiteral(true).toString,
newPredicate13.toString)
// a <> a
val predicate14 = rexBuilder.makeCall(NOT_EQUALS, a, a)
- val newPredicate14 = FlinkRexUtil.simplify(rexBuilder, predicate14)
+ val newPredicate14 = simplify(rexBuilder, predicate14)
assertEquals(rexBuilder.makeLiteral(false).toString,
newPredicate14.toString)
// a > a
val predicate15 = rexBuilder.makeCall(GREATER_THAN, a, a)
- val newPredicate15 = FlinkRexUtil.simplify(rexBuilder, predicate15)
+ val newPredicate15 = simplify(rexBuilder, predicate15)
assertEquals(rexBuilder.makeLiteral(false).toString,
newPredicate15.toString)
// a < a
val predicate16 = rexBuilder.makeCall(LESS_THAN, a, a)
- val newPredicate16 = FlinkRexUtil.simplify(rexBuilder, predicate16)
+ val newPredicate16 = simplify(rexBuilder, predicate16)
assertEquals(rexBuilder.makeLiteral(false).toString,
newPredicate16.toString)
// c = 0 AND SEARCH(c, [0, 1])
@@ -412,7 +415,7 @@ class FlinkRexUtilTest {
rexBuilder.makeIn(c, java.util.Arrays.asList(
intLiteral(0),
intLiteral(1))))
- val newPredicate17 = FlinkRexUtil.simplify(rexBuilder, predicate17)
+ val newPredicate17 = simplify(rexBuilder, predicate17)
assertEquals(
rexBuilder.makeIn(c,
Collections.singletonList[RexNode](intLiteral(0))).toString,
newPredicate17.toString)
@@ -425,7 +428,7 @@ class FlinkRexUtilTest {
OR,
rexBuilder.makeCall(EQUALS, c, intLiteral(0)),
predicate18Search)
- val newPredicate18 = FlinkRexUtil.simplify(rexBuilder, predicate18)
+ val newPredicate18 = simplify(rexBuilder, predicate18)
assertEquals(predicate18Search.toString, newPredicate18.toString)
// c > 0 AND (
@@ -447,7 +450,7 @@ class FlinkRexUtilTest {
AND,
rexBuilder.makeCall(GREATER_THAN, c, intLiteral(0)),
predicate19Layer1)
- val newPredicate19 = FlinkRexUtil.simplify(rexBuilder, predicate19)
+ val newPredicate19 = simplify(rexBuilder, predicate19)
assertEquals(
rexBuilder.makeIn(c,
Collections.singletonList[RexNode](intLiteral(1))).toString,
newPredicate19.toString)
@@ -458,9 +461,52 @@ class FlinkRexUtilTest {
OR,
rexBuilder.makeCall(GREATER_THAN_OR_EQUAL, c, intLiteral(0)),
predicate18Search)
- val newPredicate20 = FlinkRexUtil.simplify(rexBuilder, predicate20)
+ val newPredicate20 = simplify(rexBuilder, predicate20)
assertEquals(predicate20.toString, newPredicate20.toString)
+
+ //CAST(1 AS BOOLEAN)
+ val predicate21CastFromData = intLiteral(1)
+ val predicate21Cast = makeToBooleanCast(predicate21CastFromData)
+ val newPredicate21 = simplify(rexBuilder, predicate21Cast)
+ assertEquals(rexBuilder.makeLiteral(true).toString,
newPredicate21.toString)
+
+ //CAST(0 AS BOOLEAN)
+ val predicate22CastFromData = intLiteral(0)
+ val predicate22Cast = makeToBooleanCast(predicate22CastFromData)
+ val newPredicate22 = simplify(rexBuilder, predicate22Cast)
+ assertEquals(rexBuilder.makeLiteral(false).toString,
newPredicate22.toString)
+
+ //CAST(-1 AS BOOLEAN)
+ val predicate23CastFromData = intLiteral(-1)
+ val predicate23Cast = makeToBooleanCast(predicate23CastFromData)
+ val newPredicate23 = simplify(rexBuilder, predicate23Cast)
+ assertEquals(rexBuilder.makeLiteral(true).toString,
newPredicate23.toString)
+
+ //CAST(1.1 AS BOOLEAN)
+ val predicate24CastFromData =
rexBuilder.makeExactLiteral(BigDecimal.valueOf(1.1))
+ val predicate24Cast = makeToBooleanCast(predicate24CastFromData)
+ val newPredicate24 = simplify(rexBuilder, predicate24Cast)
+ assertEquals(rexBuilder.makeLiteral(true).toString,
newPredicate24.toString)
+
+ //CAST(0.000 AS BOOLEAN)
+ val predicate25CastFromData =
rexBuilder.makeExactLiteral(BigDecimal.valueOf(0.000))
+ val predicate25Cast = makeToBooleanCast(predicate25CastFromData)
+ val newPredicate25 = simplify(rexBuilder, predicate25Cast)
+ assertEquals(rexBuilder.makeLiteral(false).toString,
newPredicate25.toString)
}
def intLiteral(x: Int): RexLiteral =
rexBuilder.makeExactLiteral(BigDecimal.valueOf(x))
+
+ def simplify(rexBuilder: RexBuilder, expr: RexNode): RexNode ={
+ val expressionReducer = new ExpressionReducer(TableConfig.getDefault,
false)
+ FlinkRexUtil.simplify(rexBuilder, expr, expressionReducer)
+ }
+
+ private def makeToBooleanCast(fromData: RexNode): RexNode ={
+ val booleanType = new BasicSqlType(typeFactory.getTypeSystem,
SqlTypeName.BOOLEAN)
+ rexBuilder.makeCall(
+ booleanType,
+ CAST,
+ Collections.singletonList(fromData.asInstanceOf[RexNode]))
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 723800f..794a135 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -31,14 +31,17 @@ import
org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
import
org.apache.flink.table.runtime.typeutils.MapDataSerializerTest.CustomMapData
-import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
+import org.apache.flink.table.types.logical.{BigIntType, BooleanType, IntType,
VarCharType}
import org.apache.flink.table.utils.LegacyRowResource
import org.apache.flink.types.Row
import org.apache.flink.util.CollectionUtil
-import java.util
import org.junit.Assert._
import org.junit._
+
+import java.time.Instant
+import java.util
+
import scala.collection.JavaConversions._
import scala.collection.Seq
@@ -48,6 +51,80 @@ class CalcITCase extends StreamingTestBase {
def usesLegacyRows: LegacyRowResource = LegacyRowResource.INSTANCE
@Test
+ def testCastNumericToBooleanInProjection(): Unit ={
+ val sqlQuery =
+ "SELECT CAST(1 AS BOOLEAN), CAST(0 AS BOOLEAN), CAST(1.1 AS BOOLEAN),
CAST(0.00 AS BOOLEAN)"
+
+ val outputType = InternalTypeInfo.ofFields(
+ new BooleanType(),
+ new BooleanType(),
+ new BooleanType(),
+ new BooleanType()
+ )
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
+ val sink = new TestingAppendRowDataSink(outputType)
+ result.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "+I(true,false,true,false)"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testCastNumericToBooleanInCondition(): Unit ={
+ val sqlQuery =
+ s"""
+ | SELECT * FROM MyTableRow WHERE b = CAST(1 AS BOOLEAN)
+ | UNION ALL
+ | SELECT * FROM MyTableRow WHERE b = CAST(0 AS BOOLEAN)
+ | UNION ALL
+ | SELECT * FROM MyTableRow WHERE b = CAST(1.1 AS BOOLEAN)
+ | UNION ALL
+ | SELECT * FROM MyTableRow WHERE b = CAST(0.0 AS BOOLEAN)
+ |""".stripMargin
+
+ val rowData1: GenericRowData = new GenericRowData(2)
+ rowData1.setField(0, 1)
+ rowData1.setField(1, true)
+
+ val rowData2: GenericRowData = new GenericRowData(2)
+ rowData2.setField(0, 2)
+ rowData2.setField(1, false)
+
+ val data = List(rowData1,rowData2)
+
+ implicit val dataType: TypeInformation[GenericRowData] =
+ InternalTypeInfo.ofFields(
+ new IntType(),
+ new BooleanType()).asInstanceOf[TypeInformation[GenericRowData]]
+
+ val ds = env.fromCollection(data)
+
+ val t = ds.toTable(tEnv, 'a, 'b)
+ tEnv.registerTable("MyTableRow", t)
+
+ val outputType = InternalTypeInfo.ofFields(
+ new IntType(),
+ new BooleanType())
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData]
+ val sink = new TestingAppendRowDataSink(outputType)
+ result.addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "+I(1,true)",
+ "+I(2,false)",
+ "+I(1,true)",
+ "+I(2,false)"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
def testGenericRowAndRowData(): Unit = {
val sqlQuery = "SELECT * FROM MyTableRow"