This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b22bc62 [FLINK-21592][table-planner-blink] RemoveSingleAggregateRule fails due to nullability mismatch (#15082) b22bc62 is described below commit b22bc62ae59d3ccaef95507897c7725970e4e5c3 Author: Rui Li <li...@apache.org> AuthorDate: Wed Apr 14 17:51:43 2021 +0800 [FLINK-21592][table-planner-blink] RemoveSingleAggregateRule fails due to nullability mismatch (#15082) --- .../apache/calcite/sql2rel/RelDecorrelator.java | 22 ++++++---- .../logical/RemoveSingleAggregateRuleTest.xml | 50 ++++++++++++++++++++++ .../logical/RemoveSingleAggregateRuleTest.scala | 45 +++++++++++++++++++ 3 files changed, 109 insertions(+), 8 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java index 4181625..07893cc 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java @@ -118,11 +118,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.stream.Collectors; -/** - * Copied to fix CALCITE-4333, should be removed for the next Calcite upgrade. - * - * <p>Changes: Line 671 ~ Line 681, Line 430 ~ Line 441. - */ +/** Copied to fix calcite issues. */ public class RelDecorrelator implements ReflectiveVisitor { // ~ Static fields/initializers --------------------------------------------- @@ -439,6 +435,9 @@ public class RelDecorrelator implements ReflectiveVisitor { return null; } + // BEGIN FLINK MODIFICATION + // Reason: to de-correlate sort rel when its parent is not a correlate + // Should be removed after CALCITE-4333 is fixed final RelNode newInput = frame.r; Mappings.TargetMapping mapping = @@ -452,6 +451,7 @@ public class RelDecorrelator implements ReflectiveVisitor { final int offset = rel.offset == null ? -1 : RexLiteral.intValue(rel.offset); final int fetch = rel.fetch == null ? -1 : RexLiteral.intValue(rel.fetch); + // END FLINK MODIFICATION final RelNode newSort = relBuilder @@ -685,6 +685,9 @@ public class RelDecorrelator implements ReflectiveVisitor { public Frame getInvoke(RelNode r, RelNode parent) { final Frame frame = dispatcher.invoke(r); + // BEGIN FLINK MODIFICATION + // Reason: to de-correlate sort rel when its parent is not a correlate + // Should be removed after CALCITE-4333 is fixed if (frame != null && parent instanceof Correlate && r instanceof Sort) { Sort sort = (Sort) r; // Can not decorrelate if the sort has per-correlate-key attributes like @@ -696,6 +699,7 @@ public class RelDecorrelator implements ReflectiveVisitor { return null; } } + // END FLINK MODIFICATION if (frame != null) { map.put(r, frame); } @@ -1869,13 +1873,15 @@ public class RelDecorrelator implements ReflectiveVisitor { return; } - // singleAggRel produces a nullable type, so create the new - // projection that casts proj expr to a nullable type. + // BEGIN FLINK MODIFICATION + // Reason: fix the nullability mismatch issue final RelBuilder relBuilder = call.builder(); + final boolean nullable = singleAggregate.getAggCallList().get(0).getType().isNullable(); final RelDataType type = relBuilder .getTypeFactory() - .createTypeWithNullability(projExprs.get(0).getType(), true); + .createTypeWithNullability(projExprs.get(0).getType(), nullable); + // END FLINK MODIFICATION final RexNode cast = relBuilder.getRexBuilder().makeCast(type, projExprs.get(0)); relBuilder.push(aggregate).project(cast); call.transformTo(relBuilder.build()); diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.xml new file mode 100644 index 0000000..05ccc23 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testRemoveSingleAggregateRule"> + <Resource name="sql"> + <![CDATA[select (select count(x)-1 from foo where foo.y=bar.i) from bar]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(EXPR$0=[$SCALAR_QUERY({ +LogicalProject(EXPR$0=[-($0, 1)]) + LogicalAggregate(group=[{}], agg#0=[COUNT($0)]) + LogicalProject(x=[$0]) + LogicalFilter(condition=[=($1, $cor0.i)]) + LogicalTableScan(table=[[default_catalog, default_database, foo, source: [TestTableSource(x, y)]]]) +})]) ++- LogicalTableScan(table=[[default_catalog, default_database, bar, source: [TestTableSource(i, s)]]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[-(CASE(IS NULL($f1), 0:BIGINT, $f1), 1) AS EXPR$0]) ++- HashJoin(joinType=[LeftOuterJoin], where=[=(i, y)], select=[i, y, $f1], build=[right]) + :- Exchange(distribution=[hash[i]]) + : +- Calc(select=[i]) + : +- LegacyTableSourceScan(table=[[default_catalog, default_database, bar, source: [TestTableSource(i, s)]]], fields=[i, s]) + +- HashAggregate(isMerge=[true], groupBy=[y], select=[y, Final_COUNT(count$0) AS $f1]) + +- Exchange(distribution=[hash[y]]) + +- LocalHashAggregate(groupBy=[y], select=[y, Partial_COUNT(x) AS count$0]) + +- Calc(select=[x, y], where=[IS NOT NULL(y)]) + +- LegacyTableSourceScan(table=[[default_catalog, default_database, foo, source: [TestTableSource(x, y)]]], fields=[x, y]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.scala new file mode 100644 index 0000000..cdde94f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/RemoveSingleAggregateRuleTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api._ +import org.apache.flink.table.planner.utils.TableTestBase + +import org.junit.{Before, Test} + +/** + * Test for RemoveSingleAggregateRule. + */ +class RemoveSingleAggregateRuleTest extends TableTestBase { + + private val util = batchTestUtil() + + @Before + def setup(): Unit = { + util.addTableSource[(Int, Int)]("foo", 'x, 'y) + util.addTableSource[(Int, String)]("bar", 'i, 's) + } + + @Test + def testRemoveSingleAggregateRule(): Unit = { + util.verifyRelPlan("select (select count(x)-1 from foo where foo.y=bar.i) from bar") + } + +}