This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2e632af6a75 [FLINK-27369][table-planner] Fix the type mismatch error
in RemoveUnreachableCoalesceArgumentsRule
2e632af6a75 is described below
commit 2e632af6a75ae8db8b1dd8b2b8af9fa5a514af36
Author: godfreyhe <[email protected]>
AuthorDate: Mon Apr 25 15:45:36 2022 +0800
[FLINK-27369][table-planner] Fix the type mismatch error in
RemoveUnreachableCoalesceArgumentsRule
This closes #19566
---
.../RemoveUnreachableCoalesceArgumentsRule.java | 20 ++++++---
...RemoveUnreachableCoalesceArgumentsRuleTest.java | 23 +++++++++++
.../RemoveUnreachableCoalesceArgumentsRuleTest.xml | 48 ++++++++++++++++++----
.../planner/runtime/batch/sql/CalcITCase.scala | 25 +++++++++++
.../planner/runtime/stream/sql/CalcITCase.scala | 29 +++++++++++++
5 files changed, 133 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
index 83817e75994..8306a34b68c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRule.java
@@ -31,6 +31,7 @@ import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
@@ -58,9 +59,6 @@ public class RemoveUnreachableCoalesceArgumentsRule
public static final RelOptRule CALC_INSTANCE =
Config.EMPTY.as(Config.class).withCalc().toRule();
- private static final UnreachableCoalesceArgumentsRemoveRexShuttle
REX_SHUTTLE_INSTANCE =
- new UnreachableCoalesceArgumentsRemoveRexShuttle();
-
public RemoveUnreachableCoalesceArgumentsRule(Config config) {
super(config);
}
@@ -68,10 +66,17 @@ public class RemoveUnreachableCoalesceArgumentsRule
@Override
public void onMatch(RelOptRuleCall call) {
final RelNode relNode = call.rel(0);
- call.transformTo(relNode.accept(REX_SHUTTLE_INSTANCE));
+ final RexBuilder rexBuilder = relNode.getCluster().getRexBuilder();
+ call.transformTo(
+ relNode.accept(new
UnreachableCoalesceArgumentsRemoveRexShuttle(rexBuilder)));
}
private static class UnreachableCoalesceArgumentsRemoveRexShuttle extends
RexShuttle {
+ private final RexBuilder rexBuilder;
+
+ private UnreachableCoalesceArgumentsRemoveRexShuttle(RexBuilder
rexBuilder) {
+ this.rexBuilder = rexBuilder;
+ }
@Override
public RexNode visitCall(RexCall call) {
@@ -86,7 +91,12 @@ public class RemoveUnreachableCoalesceArgumentsRule
// If it's the first argument, just return the argument without
the coalesce invocation
if (firstNonNullableArgIndex == 0) {
- return call.operands.get(0);
+ RexNode operand = call.operands.get(0);
+ if (call.getType().equals(operand.getType())) {
+ return operand;
+ } else {
+ return rexBuilder.makeCast(call.getType(), operand);
+ }
}
// If it's the last argument, or no non-null argument was found,
return the original
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
index bbc427fe3e1..799ac38014c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.java
@@ -83,4 +83,27 @@ public class RemoveUnreachableCoalesceArgumentsRuleTest
extends TableTestBase {
util.verifyRelPlan(
"SELECT * FROM T t1 LEFT JOIN T t2 ON COALESCE(t1.f0, '-',
t1.f2) = t2.f0");
}
+
+ @Test
+ public void testMultipleCoalesces() {
+ util.verifyRelPlan(
+ "SELECT COALESCE(1),\n"
+ + "COALESCE(1, 2),\n"
+ + "COALESCE(cast(NULL as int), 2),\n"
+ + "COALESCE(1, cast(NULL as int)),\n"
+ + "COALESCE(cast(NULL as int), cast(NULL as int),
3),\n"
+ + "COALESCE(4, cast(NULL as int), cast(NULL as int),
cast(NULL as int)),\n"
+ + "COALESCE('1'),\n"
+ + "COALESCE('1', '23'),\n"
+ + "COALESCE(cast(NULL as varchar), '2'),\n"
+ + "COALESCE('1', cast(NULL as varchar)),\n"
+ + "COALESCE(cast(NULL as varchar), cast(NULL as
varchar), '3'),\n"
+ + "COALESCE('4', cast(NULL as varchar), cast(NULL as
varchar), cast(NULL as varchar)),\n"
+ + "COALESCE(1.0),\n"
+ + "COALESCE(1.0, 2),\n"
+ + "COALESCE(cast(NULL as double), 2.0),\n"
+ + "COALESCE(cast(NULL as double), 2.0, 3.0),\n"
+ + "COALESCE(2.0, cast(NULL as double), 3.0),\n"
+ + "COALESCE(cast(NULL as double), cast(NULL as
double))");
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
index 1f55be25576..2d766d1c679 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveUnreachableCoalesceArgumentsRuleTest.xml
@@ -81,6 +81,24 @@ LogicalProject(EXPR$0=[COALESCE($0, $1, _UTF-16LE'-')])
<![CDATA[
Calc(select=[COALESCE(f0, f1) AS EXPR$0])
+- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0,
f1, f2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterCoalesce">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM T WHERE COALESCE(f0, f1, '-') = 'abc']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f0=[$0], f1=[$1], f2=[$2])
++- LogicalFilter(condition=[=(COALESCE($0, $1, _UTF-16LE'-'), _UTF-16LE'abc')])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[f0, f1, f2], where=[=(COALESCE(f0, f1), 'abc')])
++- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0,
f1, f2])
]]>
</Resource>
</TestCase>
@@ -109,21 +127,37 @@ Calc(select=[f0, f1, f2, f00, f10, f20])
]]>
</Resource>
</TestCase>
- <TestCase name="testFilterCoalesce">
+ <TestCase name="testMultipleCoalesces">
<Resource name="sql">
- <![CDATA[SELECT * FROM T WHERE COALESCE(f0, f1, '-') = 'abc']]>
+ <![CDATA[SELECT COALESCE(1),
+COALESCE(1, 2),
+COALESCE(cast(NULL as int), 2),
+COALESCE(1, cast(NULL as int)),
+COALESCE(cast(NULL as int), cast(NULL as int), 3),
+COALESCE(4, cast(NULL as int), cast(NULL as int), cast(NULL as int)),
+COALESCE('1'),
+COALESCE('1', '23'),
+COALESCE(cast(NULL as varchar), '2'),
+COALESCE('1', cast(NULL as varchar)),
+COALESCE(cast(NULL as varchar), cast(NULL as varchar), '3'),
+COALESCE('4', cast(NULL as varchar), cast(NULL as varchar), cast(NULL as
varchar)),
+COALESCE(1.0),
+COALESCE(1.0, 2),
+COALESCE(cast(NULL as double), 2.0),
+COALESCE(cast(NULL as double), 2.0, 3.0),
+COALESCE(2.0, cast(NULL as double), 3.0),
+COALESCE(cast(NULL as double), cast(NULL as double))]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(f0=[$0], f1=[$1], f2=[$2])
-+- LogicalFilter(condition=[=(COALESCE($0, $1, _UTF-16LE'-'), _UTF-16LE'abc')])
- +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+LogicalProject(EXPR$0=[COALESCE(1)], EXPR$1=[COALESCE(1, 2)],
EXPR$2=[COALESCE(null:INTEGER, 2)], EXPR$3=[COALESCE(1, null:INTEGER)],
EXPR$4=[COALESCE(null:INTEGER, null:INTEGER, 3)], EXPR$5=[COALESCE(4,
null:INTEGER, null:INTEGER, null:INTEGER)], EXPR$6=[COALESCE(_UTF-16LE'1')],
EXPR$7=[COALESCE(_UTF-16LE'1', _UTF-16LE'23')],
EXPR$8=[COALESCE(null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE",
_UTF-16LE'2')], EXPR$9=[COALESCE(_UTF-16LE'1', null:VARCHAR(2147483647)
CHARACTER SET "UTF-16LE [...]
++- LogicalValues(tuples=[[{ 0 }]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[f0, f1, f2], where=[=(COALESCE(f0, f1), 'abc')])
-+- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0,
f1, f2])
+Calc(select=[1 AS EXPR$0, 1 AS EXPR$1, 2 AS EXPR$2, 1 AS EXPR$3, 3 AS EXPR$4,
4 AS EXPR$5, '1' AS EXPR$6, '1' AS EXPR$7, '2' AS EXPR$8, '1' AS EXPR$9, '3' AS
EXPR$10, '4' AS EXPR$11, 1.0 AS EXPR$12, 1.0 AS EXPR$13, 2E0 AS EXPR$14, 2E0 AS
EXPR$15, 2.0 AS EXPR$16, null:DOUBLE AS EXPR$17])
++- Values(tuples=[[{ 0 }]])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
index c63daa04170..2323e1662c9 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CalcITCase.scala
@@ -2063,4 +2063,29 @@ class CalcITCase extends BatchTestBase {
checkResult("SELECT TRY_CAST('invalid' AS INT)", Seq(row(null)))
checkResult("SELECT TRY_CAST(g AS DOUBLE) FROM testTable", Seq(row(null),
row(null), row(null)))
}
+
+ @Test
+ def testMultipleCoalesces(): Unit = {
+ checkResult(
+ "SELECT COALESCE(1),\n" +
+ "COALESCE(1, 2),\n" +
+ "COALESCE(cast(NULL as int), 2),\n" +
+ "COALESCE(1, cast(NULL as int)),\n" +
+ "COALESCE(cast(NULL as int), cast(NULL as int), 3),\n" +
+ "COALESCE(4, cast(NULL as int), cast(NULL as int), cast(NULL as
int)),\n" +
+ "COALESCE('1'),\n" +
+ "COALESCE('1', '23'),\n" +
+ "COALESCE(cast(NULL as varchar), '2'),\n" +
+ "COALESCE('1', cast(NULL as varchar)),\n" +
+ "COALESCE(cast(NULL as varchar), cast(NULL as varchar), '3'),\n" +
+ "COALESCE('4', cast(NULL as varchar), cast(NULL as varchar), cast(NULL
as varchar)),\n" +
+ "COALESCE(1.0),\n" +
+ "COALESCE(1.0, 2),\n" +
+ "COALESCE(cast(NULL as double), 2.0),\n" +
+ "COALESCE(cast(NULL as double), 2.0, 3.0),\n" +
+ "COALESCE(2.0, cast(NULL as double), 3.0),\n" +
+ "COALESCE(cast(NULL as double), cast(NULL as double))",
+ Seq(row(1, 1, 2, 1, 3, 4, 1, 1, 2, 1, 3, 4, 1.0, 1.0, 2.0, 2.0, 2.0,
null))
+ )
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index a206b82782d..2a05b5f06c6 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -649,4 +649,33 @@ class CalcITCase extends StreamingTestBase {
List("HC809", "H389N ")
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
+
+ @Test
+ def testMultipleCoalesces(): Unit = {
+ val result = tEnv
+ .sqlQuery(
+ "SELECT COALESCE(1),\n" +
+ "COALESCE(1, 2),\n" +
+ "COALESCE(cast(NULL as int), 2),\n" +
+ "COALESCE(1, cast(NULL as int)),\n" +
+ "COALESCE(cast(NULL as int), cast(NULL as int), 3),\n" +
+ "COALESCE(4, cast(NULL as int), cast(NULL as int), cast(NULL as
int)),\n" +
+ "COALESCE('1'),\n" +
+ "COALESCE('1', '23'),\n" +
+ "COALESCE(cast(NULL as varchar), '2'),\n" +
+ "COALESCE('1', cast(NULL as varchar)),\n" +
+ "COALESCE(cast(NULL as varchar), cast(NULL as varchar), '3'),\n" +
+ "COALESCE('4', cast(NULL as varchar), cast(NULL as varchar),
cast(NULL as varchar)),\n" +
+ "COALESCE(1.0),\n" +
+ "COALESCE(1.0, 2),\n" +
+ "COALESCE(cast(NULL as double), 2.0),\n" +
+ "COALESCE(cast(NULL as double), 2.0, 3.0),\n" +
+ "COALESCE(2.0, cast(NULL as double), 3.0),\n" +
+ "COALESCE(cast(NULL as double), cast(NULL as double))")
+ .execute()
+ .collect()
+ .toList
+ TestBaseUtils.compareResultAsText(result,
"1,1,2,1,3,4,1,1,2,1,3,4,1.0,1.0,2.0,2.0,2.0,null")
+ }
+
}