This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6cfeb2e8bb2 [FLINK-38832][table-planner] Push down single column when
using count star (#27681)
6cfeb2e8bb2 is described below
commit 6cfeb2e8bb2c66e0b293c538dcf28bf181fedd38
Author: Xuyang <[email protected]>
AuthorDate: Thu Mar 5 15:40:12 2026 +0800
[FLINK-38832][table-planner] Push down single column when using count star
(#27681)
---
.../table/planner/calcite/FlinkRelBuilder.java | 20 --
.../rules/logical/PruneCountStarInputRule.java | 110 +++++++++
.../planner/plan/rules/FlinkBatchRuleSets.scala | 4 +-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +-
.../planner/plan/batch/sql/DeadlockBreakupTest.xml | 3 +-
.../plan/batch/sql/MultipleInputCreationTest.xml | 9 +-
.../planner/plan/batch/sql/RowLevelDeleteTest.xml | 34 ++-
.../planner/plan/batch/sql/RowLevelUpdateTest.xml | 34 ++-
.../planner/plan/batch/sql/TableSourceTest.xml | 2 +-
.../plan/batch/sql/agg/HashAggregateTest.xml | 87 ++++++-
.../plan/batch/sql/agg/SortAggregateTest.xml | 87 ++++++-
.../batch/sql/join/NestedLoopSemiAntiJoinTest.xml | 6 +-
.../plan/batch/sql/join/SemiAntiJoinTest.xml | 6 +-
.../plan/hints/batch/BroadcastJoinHintTest.xml | 17 +-
.../plan/hints/batch/NestLoopJoinHintTest.xml | 17 +-
.../plan/hints/batch/ShuffleHashJoinHintTest.xml | 17 +-
.../plan/hints/batch/ShuffleMergeJoinHintTest.xml | 17 +-
.../plan/optimize/ShuffleModePlanOptimizeTest.xml | 275 +++++++++++----------
.../PushLocalAggIntoTableSourceScanRuleTest.xml | 6 +-
.../planner/plan/stream/sql/TableScanTest.xml | 4 +-
.../planner/plan/stream/sql/TableSourceTest.xml | 2 +-
.../planner/plan/stream/sql/agg/AggregateTest.xml | 33 ++-
.../plan/stream/sql/join/SemiAntiJoinTest.xml | 6 +-
.../plan/batch/sql/agg/AggregateTestBase.scala | 30 ++-
.../plan/stream/sql/agg/AggregateTest.scala | 13 +
25 files changed, 608 insertions(+), 235 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 53107b32daa..9045b7d9e38 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -35,8 +35,6 @@ import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
-
import org.apache.calcite.plan.Context;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.RelOptCluster;
@@ -44,7 +42,6 @@ import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptSchema;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
import org.apache.calcite.rel.type.RelDataType;
@@ -52,7 +49,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
@@ -197,12 +193,6 @@ public final class FlinkRelBuilder extends RelBuilder {
final LogicalAggregate logicalAggregate = (LogicalAggregate)
relNode;
if (isTableAggregate(logicalAggregate.getAggCallList())) {
relNode = LogicalTableAggregate.create(logicalAggregate);
- } else if (isCountStarAgg(logicalAggregate)) {
- final RelNode newAggInput =
-
push(logicalAggregate.getInput(0)).project(literal(0)).build();
- relNode =
- logicalAggregate.copy(
- logicalAggregate.getTraitSet(),
ImmutableList.of(newAggInput));
}
}
@@ -270,14 +260,4 @@ public final class FlinkRelBuilder extends RelBuilder {
cluster.getPlanner().getContext());
return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema);
}
-
- private static boolean isCountStarAgg(LogicalAggregate agg) {
- if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
- return false;
- }
- final AggregateCall call = agg.getAggCallList().get(0);
- return call.getAggregation().getKind() == SqlKind.COUNT
- && call.filterArg == -1
- && call.getArgList().isEmpty();
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PruneCountStarInputRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PruneCountStarInputRule.java
new file mode 100644
index 00000000000..5676301f6c4
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PruneCountStarInputRule.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.immutables.value.Value;
+
+import java.util.Collections;
+
+/**
+ * Planner rule that prunes the input columns of a {@link LogicalAggregate}
representing {@code
+ * COUNT(*)} (i.e. no group keys, a single COUNT aggregate call with no
arguments and no filter) by
+ * inserting a project of a constant literal {@code 0} between the aggregate
and its input.
+ *
+ * <p>This avoids reading all columns from the source when only counting rows.
+ *
+ * <p>Before:
+ *
+ * <pre>
+ * LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ * +- SomeInput
+ * </pre>
+ *
+ * <p>After:
+ *
+ * <pre>
+ * LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ * +- LogicalProject($f0=[0])
+ * +- SomeInput
+ * </pre>
+ */
[email protected]
+public class PruneCountStarInputRule
+ extends RelRule<PruneCountStarInputRule.PruneCountStarInputRuleConfig>
{
+
+ public static final PruneCountStarInputRule INSTANCE =
+
PruneCountStarInputRule.PruneCountStarInputRuleConfig.DEFAULT.toRule();
+
+ protected PruneCountStarInputRule(PruneCountStarInputRuleConfig config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final LogicalAggregate agg = call.rel(0);
+ final RelNode input = agg.getInput();
+ if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
+ return false;
+ }
+ final AggregateCall aggCall = agg.getAggCallList().get(0);
+ if (aggCall.getAggregation().getKind() != SqlKind.COUNT
+ || aggCall.filterArg != -1
+ || !aggCall.getArgList().isEmpty()) {
+ return false;
+ }
+ // Only rewrite when the input has more than one field. After the
rewrite, the input
+ // becomes a single-field Project(0), so this condition naturally
prevents repeated
+ // application even if other rules in the same phase transform or
remove the inserted
+ // project.
+ return input.getRowType().getFieldCount() > 1;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final LogicalAggregate agg = call.rel(0);
+ final RelNode input = agg.getInput();
+
+ final RelBuilder relBuilder = call.builder();
+ final RelNode newInput =
relBuilder.push(input).project(relBuilder.literal(0)).build();
+ final RelNode newAgg = agg.copy(agg.getTraitSet(),
Collections.singletonList(newInput));
+ call.transformTo(newAgg);
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface PruneCountStarInputRuleConfig extends RelRule.Config {
+ PruneCountStarInputRule.PruneCountStarInputRuleConfig DEFAULT =
+
ImmutablePruneCountStarInputRule.PruneCountStarInputRuleConfig.builder()
+ .operandSupplier(b0 ->
b0.operand(LogicalAggregate.class).anyInputs())
+ .description("PruneCountStarInputRule")
+ .build();
+
+ @Override
+ default PruneCountStarInputRule toRule() {
+ return new PruneCountStarInputRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 546081a9c5e..ab401981684 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -125,7 +125,9 @@ object FlinkBatchRuleSets {
// vector search rule.
ConstantVectorSearchCallToCorrelateRule.INSTANCE,
// Wrap arguments for JSON aggregate functions
- WrapJsonAggFunctionArgumentsRule.INSTANCE
+ WrapJsonAggFunctionArgumentsRule.INSTANCE,
+ // prune COUNT(*) input to project a constant before aggregation
+ PruneCountStarInputRule.INSTANCE
)).asJava)
/** RuleSet about filter */
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 15372999cd1..d8307013965 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -133,7 +133,9 @@ object FlinkStreamRuleSets {
// rewrite constant table function scan to correlate
JoinTableFunctionScanToCorrelateRule.INSTANCE,
// Wrap arguments for JSON aggregate functions
- WrapJsonAggFunctionArgumentsRule.INSTANCE
+ WrapJsonAggFunctionArgumentsRule.INSTANCE,
+ // prune COUNT(*) input to project a constant before aggregation
+ PruneCountStarInputRule.INSTANCE
)
).asJava)
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
index 6a8be634552..464500c844a 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
@@ -273,7 +273,8 @@ NestedLoopJoin(joinType=[FullOuterJoin], where=[(cnt <>
cnt0)], select=[cnt, cnt
: +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
cnt])(reuse_id=[1])
: +- Exchange(distribution=[single])
: +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- TableSourceScan(table=[[default_catalog, default_database, x]],
fields=[a, b, c])
+: +- Calc(select=[0 AS $f0])
+: +- TableSourceScan(table=[[default_catalog, default_database,
x]], fields=[a, b, c])
+- Exchange(distribution=[single], shuffle_mode=[BATCH])
+- Calc(select=[cnt], where=[(cnt < 5)])
+- Reused(reference_id=[1])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
index c555231bd38..fd26e57fb89 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
@@ -158,7 +158,8 @@ Calc(select=[a])
: +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
cnt])
: +- Exchange(distribution=[single])
: +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Reused(reference_id=[1])
+ : +- Calc(select=[0 AS $f0])
+ : +- Reused(reference_id=[1])
:- Exchange(distribution=[broadcast])
: +- Calc(select=[a])
: +- TableSourceScan(table=[[default_catalog, default_database, x]],
fields=[a, b, c, nx])
@@ -213,7 +214,8 @@ Calc(select=[a])
: +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
cnt])
: +- Exchange(distribution=[single])
: +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Reused(reference_id=[1])
+ : +- Calc(select=[0 AS $f0])
+ : +- Reused(reference_id=[1])
:- Exchange(distribution=[broadcast])
: +- Calc(select=[a])
: +- TableSourceScan(table=[[default_catalog, default_database, x]],
fields=[a, b, c, nx])
@@ -268,7 +270,8 @@ Calc(select=[a])
: +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
cnt])
: +- Exchange(distribution=[single])
: +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Reused(reference_id=[1])
+ : +- Calc(select=[0 AS $f0])
+ : +- Reused(reference_id=[1])
:- Exchange(distribution=[broadcast])
: +- Calc(select=[a])
: +- TableSourceScan(table=[[default_catalog, default_database, x]],
fields=[a, b, c, nx])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
index 71310ff839c..2ea03e77d82 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
@@ -382,7 +382,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a,
b])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.t], fields=[a, b])
@@ -395,7 +396,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a,
b])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- Reused(reference_id=[1])
+ +- Calc(select=[0 AS $f0])
+ +- Reused(reference_id=[1])
== Physical Execution Plan ==
{
@@ -416,6 +418,17 @@ Sink(table=[default_catalog.default_database.t],
fields=[a, b])
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[0 AS $f0])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
}, {
"id" : ,
"type" : "HashAggregate[]",
@@ -510,7 +523,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a,
b])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.t], fields=[a, b])
@@ -522,7 +536,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a,
b])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- Reused(reference_id=[1])
+ +- Calc(select=[0 AS $f0])
+ +- Reused(reference_id=[1])
== Physical Execution Plan ==
{
@@ -532,6 +547,17 @@ Sink(table=[default_catalog.default_database.t],
fields=[a, b])
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b])",
"parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[0 AS $f0])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
}, {
"id" : ,
"type" : "HashAggregate[]",
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
index 85e79a8c0ec..399fcb0dd5e 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
@@ -605,7 +605,8 @@ Sink(table=[default_catalog.default_database.t],
targetColumns=[[1]], fields=[a,
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.t], targetColumns=[[1]],
fields=[a, b])
@@ -616,7 +617,8 @@ Sink(table=[default_catalog.default_database.t],
targetColumns=[[1]], fields=[a,
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
== Physical Execution Plan ==
{
@@ -632,6 +634,17 @@ Sink(table=[default_catalog.default_database.t],
targetColumns=[[1]], fields=[a,
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])",
"parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[0 AS $f0])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
}, {
"id" : ,
"type" : "HashAggregate[]",
@@ -716,7 +729,8 @@ Sink(table=[default_catalog.default_database.t],
targetColumns=[[1]], fields=[a,
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.t], targetColumns=[[1]],
fields=[a, b])
@@ -728,7 +742,8 @@ Sink(table=[default_catalog.default_database.t],
targetColumns=[[1]], fields=[a,
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])
== Physical Execution Plan ==
{
@@ -755,6 +770,17 @@ Sink(table=[default_catalog.default_database.t],
targetColumns=[[1]], fields=[a,
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[a, b])",
"parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[0 AS $f0])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
}, {
"id" : ,
"type" : "HashAggregate[]",
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index fdc0fea2d7f..988018a2f3f 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -117,7 +117,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database,
ProjectableTable, aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
ProjectableTable, project=[a], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index bcccb462898..0c35071320c 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -410,6 +410,73 @@ HashAggregate(isMerge=[true], select=[Final_AVG(sum$0,
count$1) AS EXPR$0, Final
+- LocalHashAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1),
Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5),
Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9),
Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12,
count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]),
rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3,
BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
+- Calc(select=[byte, short, int, long, float, double, decimal3020,
decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int,
BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020,
DECIMAL(10, 5) decimal105)]
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[byte, short, int, long, float, double, boolean, string,
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double,
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time,
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCountStarWithHavingAndProjectPushDown[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT
EXPR$0)]
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]]),
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT
EXPR$0)]
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+ +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT
EXPR$0)]
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]]),
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT
EXPR$0)]
++- HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+ +- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id]),
rowType=[RecordType(VARCHAR(2147483647) id)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT
EXPR$0)]
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]]),
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT
EXPR$0)]
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+ +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
]]>
</Resource>
</TestCase>
@@ -427,7 +494,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<![CDATA[
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, src,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
]]>
</Resource>
</TestCase>
@@ -444,8 +511,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<Resource name="optimized rel plan">
<![CDATA[
HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(VARCHAR(2147483647)
id, BIGINT cnt)]
- +- TableSourceScan(table=[[default_catalog, default_database, src]],
fields=[id, cnt]), rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id]),
rowType=[RecordType(VARCHAR(2147483647) id)]
]]>
</Resource>
</TestCase>
@@ -463,7 +531,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<![CDATA[
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, src,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
]]>
</Resource>
</TestCase>
@@ -482,7 +550,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]),
rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[byte, short, int, long, float, double, boolean, string,
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double,
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time,
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
]]>
</Resource>
</TestCase>
@@ -499,8 +568,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<Resource name="optimized rel plan">
<![CDATA[
HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT
short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean,
VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp,
DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
]]>
</Resource>
</TestCase>
@@ -519,7 +589,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]),
rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[byte, short, int, long, float, double, boolean, string,
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double,
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time,
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 4d52406652c..7638f23d247 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -533,6 +533,73 @@ SortAggregate(isMerge=[true], select=[Final_AVG(sum$0,
count$1) AS EXPR$0, Final
+- LocalSortAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1),
Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5),
Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9),
Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12,
count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]),
rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3,
BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
+- Calc(select=[byte, short, int, long, float, double, decimal3020,
decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int,
BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020,
DECIMAL(10, 5) decimal105)]
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[byte, short, int, long, float, double, boolean, string,
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double,
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time,
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCountStarWithHavingAndProjectPushDown[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT
EXPR$0)]
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]]),
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT
EXPR$0)]
++- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+ +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT
EXPR$0)]
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]]),
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT
EXPR$0)]
++- SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+ +- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id]),
rowType=[RecordType(VARCHAR(2147483647) id)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT
EXPR$0)]
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]]),
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT
EXPR$0)]
++- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+ +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
]]>
</Resource>
</TestCase>
@@ -550,7 +617,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<![CDATA[
SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, src,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
]]>
</Resource>
</TestCase>
@@ -567,8 +634,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<Resource name="optimized rel plan">
<![CDATA[
SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(VARCHAR(2147483647)
id, BIGINT cnt)]
- +- TableSourceScan(table=[[default_catalog, default_database, src]],
fields=[id, cnt]), rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id]),
rowType=[RecordType(VARCHAR(2147483647) id)]
]]>
</Resource>
</TestCase>
@@ -586,7 +654,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<![CDATA[
SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, src,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]),
rowType=[RecordType(BIGINT count1$0)]
]]>
</Resource>
</TestCase>
@@ -605,7 +673,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]),
rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[byte, short, int, long, float, double, boolean, string,
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double,
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time,
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
]]>
</Resource>
</TestCase>
@@ -622,8 +691,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
<Resource name="optimized rel plan">
<![CDATA[
SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT
short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean,
VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp,
DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
]]>
</Resource>
</TestCase>
@@ -642,7 +712,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]),
rowType=[RecordType(BIGINT EXPR$
SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]),
rowType=[RecordType(BIGINT EXPR$0)]
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]),
rowType=[RecordType(BIGINT count1$0)]
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+ +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[byte, short, int, long, float, double, boolean, string,
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double,
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time,
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
index fed6b988a93..ab0df8f4bce 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
@@ -680,7 +680,8 @@ Calc(select=[b])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- Reused(reference_id=[1])
+ +- Calc(select=[0 AS $f0])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
@@ -1202,7 +1203,8 @@ Calc(select=[b])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- Reused(reference_id=[1])
+ +- Calc(select=[0 AS $f0])
+ +- Reused(reference_id=[1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
index a975f2bd917..f4f00d200aa 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
@@ -699,7 +699,8 @@ Calc(select=[b])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0)
AS c])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS
count1$0])
- : +- Reused(reference_id=[1])
+ : +- Calc(select=[0 AS $f0])
+ : +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d, f]])
+- Calc(select=[d, f])
+- TableSourceScan(table=[[default_catalog, default_database, r]],
fields=[d, e, f])
@@ -1243,7 +1244,8 @@ Calc(select=[b])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0)
AS c])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS
count1$0])
- : +- Reused(reference_id=[1])
+ : +- Calc(select=[0 AS $f0])
+ : +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[d]])
+- Calc(select=[d])
+- TableSourceScan(table=[[default_catalog, default_database, r]],
fields=[d, e, f])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
index 4b24a4f7a39..9bf2749be2a 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
@@ -1087,11 +1087,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, EXPR$0], buil
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1,
b1, a3, b3], build=[left])
- :- Exchange(distribution=[hash[a1]])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
- +- Exchange(distribution=[hash[a3]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)],
select=[a1, a3], build=[left])
+ :- Exchange(distribution=[hash[a1]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS
options:[T1]]]])
+ +- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS
options:[T3]]]])
]]>
</Resource>
</TestCase>
@@ -1166,8 +1167,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Calc(select=[a1, b1], where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- Calc(select=[a1], where=[=(a1, 1)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1176,7 +1177,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
+- LocalHashAggregate(groupBy=[a3], select=[a3])
+- Union(all=[true], union=[a3])
:- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a3, 1)])
+- TableSourceScan(table=[[default_catalog,
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3],
hints=[[[ALIAS options:[T3]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
index d280cd87b0d..ee97a0df7bf 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
@@ -1084,11 +1084,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, EXPR$0], buil
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1,
b1, a3, b3], build=[left])
- :- Exchange(distribution=[hash[a1]])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
- +- Exchange(distribution=[hash[a3]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)],
select=[a1, a3], build=[left])
+ :- Exchange(distribution=[hash[a1]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS
options:[T1]]]])
+ +- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS
options:[T3]]]])
]]>
</Resource>
</TestCase>
@@ -1163,8 +1164,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Calc(select=[a1, b1], where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- Calc(select=[a1], where=[=(a1, 1)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1173,7 +1174,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
+- LocalHashAggregate(groupBy=[a3], select=[a3])
+- Union(all=[true], union=[a3])
:- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a3, 1)])
+- TableSourceScan(table=[[default_catalog,
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3],
hints=[[[ALIAS options:[T3]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
index c6f3c923360..2e2911591ea 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
@@ -1108,11 +1108,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, EXPR$0], buil
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1,
b1, a3, b3], build=[left])
- :- Exchange(distribution=[hash[a1]])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
- +- Exchange(distribution=[hash[a3]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)],
select=[a1, a3], build=[left])
+ :- Exchange(distribution=[hash[a1]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS
options:[T1]]]])
+ +- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS
options:[T3]]]])
]]>
</Resource>
</TestCase>
@@ -1187,8 +1188,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Calc(select=[a1, b1], where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- Calc(select=[a1], where=[=(a1, 1)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1197,7 +1198,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
+- LocalHashAggregate(groupBy=[a3], select=[a3])
+- Union(all=[true], union=[a3])
:- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a3, 1)])
+- TableSourceScan(table=[[default_catalog,
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3],
hints=[[[ALIAS options:[T3]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
index 0c12624194c..f89c07d64f3 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
@@ -1108,11 +1108,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, EXPR$0], buil
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1,
b1, a3, b3], build=[left])
- :- Exchange(distribution=[hash[a1]])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
- +- Exchange(distribution=[hash[a3]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)],
select=[a1, a3], build=[left])
+ :- Exchange(distribution=[hash[a1]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS
options:[T1]]]])
+ +- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS
options:[T3]]]])
]]>
</Resource>
</TestCase>
@@ -1187,8 +1188,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- : +- Calc(select=[a1, b1], where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- Calc(select=[a1], where=[=(a1, 1)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS
EXPR$0])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1197,7 +1198,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true],
select=[a1, $f2], build=[
+- LocalHashAggregate(groupBy=[a3], select=[a3])
+- Union(all=[true], union=[a3])
:- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a1, 1)])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1,
b1], hints=[[[ALIAS options:[T1]]]])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1],
hints=[[[ALIAS options:[T1]]]])
+- Calc(select=[CAST(1 AS BIGINT) AS a3],
where=[=(a3, 1)])
+- TableSourceScan(table=[[default_catalog,
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3],
hints=[[[ALIAS options:[T3]]]])
]]>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
index 9439c7bd25a..8f7d724122e 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
@@ -49,39 +49,43 @@ MultipleInput(readOrder=[0,0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin],
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[right])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-: : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
-: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-: : : +- Calc(select=[id, male, amount, price,
dim_date_sk], where=[(price < 400)])(reuse_id=[1])
-: : : +- TableSourceScan(table=[[testCatalog,
test_database, dim]], fields=[id, male, amount, price,
dim_date_sk])(reuse_id=[2])
-: : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-: +- Exchange(distribution=[hash[dim_date_sk]])
-: +- Reused(reference_id=[1])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+: : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
+: : : +-
DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : : +- Calc(select=[dim_date_sk], where=[(price <
400)])(reuse_id=[1])
+: : : +- TableSourceScan(table=[[testCatalog,
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price,
dim_date_sk])(reuse_id=[2])
+: : +-
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+: +- Exchange(distribution=[hash[dim_date_sk]])
+: +- Reused(reference_id=[1])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 100)])
-: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk])
-: +- Exchange(distribution=[hash[dim_date_sk]])
-: +- Reused(reference_id=[2])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 100)])
+: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]],
fields=[price, fact_date_sk])
+: +- Exchange(distribution=[hash[dim_date_sk]])
+: +- Calc(select=[dim_date_sk])
+: +- Reused(reference_id=[2])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[right])
- :- Exchange(distribution=[hash[fact_date_sk]])
- : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
- : :- Exchange(distribution=[broadcast], shuffle_mode=[BATCH])
- : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
- : : +- Calc(select=[id, male, amount, price, dim_date_sk],
where=[(price < 200)])(reuse_id=[3])
- : : +- Reused(reference_id=[2])
- : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
- +- Exchange(distribution=[hash[dim_date_sk]])
- +- Reused(reference_id=[3])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+ :- Exchange(distribution=[hash[fact_date_sk]])
+ : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+ : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
+ : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ : : +- Calc(select=[dim_date_sk], where=[(price <
200)])(reuse_id=[3])
+ : : +- Reused(reference_id=[2])
+ : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part, project=[fact_date_sk], metadata=[]]],
fields=[fact_date_sk])
+ +- Exchange(distribution=[hash[dim_date_sk]])
+ +- Reused(reference_id=[3])
]]>
</Resource>
</TestCase>
@@ -118,39 +122,44 @@ MultipleInput(readOrder=[0,0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin],
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[right])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-: : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
-: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-: : : +- Calc(select=[id, male, amount, price,
dim_date_sk], where=[(price < 400)])(reuse_id=[1])
-: : : +- TableSourceScan(table=[[testCatalog,
test_database, dim]], fields=[id, male, amount, price,
dim_date_sk])(reuse_id=[2])
-: : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-: +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
-: +- Reused(reference_id=[1])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+: : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
+: : : +-
DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : : +- Calc(select=[dim_date_sk], where=[(price <
400)])(reuse_id=[1])
+: : : +- TableSourceScan(table=[[testCatalog,
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price,
dim_date_sk])(reuse_id=[2])
+: : +-
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+: +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
+: +- Reused(reference_id=[1])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 100)])
-: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk])
-: +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
-: +- Reused(reference_id=[2])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 100)])
+: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]],
fields=[price, fact_date_sk])
+: +- Exchange(distribution=[hash[dim_date_sk]])
+: +- Calc(select=[dim_date_sk])
+: +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+: +- Reused(reference_id=[2])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[right])
- :- Exchange(distribution=[hash[fact_date_sk]])
- : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
- : :- Exchange(distribution=[broadcast], shuffle_mode=[BATCH])
- : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
- : : +- Calc(select=[id, male, amount, price, dim_date_sk],
where=[(price < 200)])(reuse_id=[3])
- : : +- Reused(reference_id=[2])
- : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
- +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH])
- +- Reused(reference_id=[3])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+ :- Exchange(distribution=[hash[fact_date_sk]])
+ : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+ : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
+ : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ : : +- Calc(select=[dim_date_sk], where=[(price <
200)])(reuse_id=[3])
+ : : +- Reused(reference_id=[2])
+ : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part, project=[fact_date_sk], metadata=[]]],
fields=[fact_date_sk])
+ +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
+ +- Reused(reference_id=[3])
]]>
</Resource>
</TestCase>
@@ -187,39 +196,44 @@ MultipleInput(readOrder=[0,0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin],
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[right])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-: : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
-: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-: : : +- Calc(select=[id, male, amount, price,
dim_date_sk], where=[(price < 400)])(reuse_id=[1])
-: : : +- TableSourceScan(table=[[testCatalog,
test_database, dim]], fields=[id, male, amount, price,
dim_date_sk])(reuse_id=[2])
-: : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-: +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
-: +- Reused(reference_id=[1])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+: : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
+: : : +-
DynamicFilteringDataCollector(fields=[dim_date_sk])
+: : : +- Calc(select=[dim_date_sk], where=[(price <
400)])(reuse_id=[1])
+: : : +- TableSourceScan(table=[[testCatalog,
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price,
dim_date_sk])(reuse_id=[2])
+: : +-
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+: +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
+: +- Reused(reference_id=[1])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 100)])
-: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk])
-: +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
-: +- Reused(reference_id=[2])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 100)])
+: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]],
fields=[price, fact_date_sk])
+: +- Exchange(distribution=[hash[dim_date_sk]])
+: +- Calc(select=[dim_date_sk])
+: +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+: +- Reused(reference_id=[2])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[right])
- :- Exchange(distribution=[hash[fact_date_sk]])
- : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database,
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
- : :- Exchange(distribution=[broadcast], shuffle_mode=[BATCH])
- : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
- : : +- Calc(select=[id, male, amount, price, dim_date_sk],
where=[(price < 200)])(reuse_id=[3])
- : : +- Reused(reference_id=[2])
- : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
- +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH])
- +- Reused(reference_id=[3])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+ :- Exchange(distribution=[hash[fact_date_sk]])
+ : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1]
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2]
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part,
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+ : :- Exchange(distribution=[broadcast],
shuffle_mode=[BATCH])
+ : : +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+ : : +- Calc(select=[dim_date_sk], where=[(price <
200)])(reuse_id=[3])
+ : : +- Reused(reference_id=[2])
+ : +- DynamicFilteringTableSourceScan(table=[[testCatalog,
test_database, fact_part, project=[fact_date_sk], metadata=[]]],
fields=[fact_date_sk])
+ +- Exchange(distribution=[hash[dim_date_sk]],
shuffle_mode=[BATCH])
+ +- Reused(reference_id=[3])
]]>
</Resource>
</TestCase>
@@ -256,29 +270,32 @@ MultipleInput(readOrder=[0,0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin],
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 400)])
-: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[]]], fields=[id, name, amount, price,
fact_date_sk])(reuse_id=[1])
-: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
-: +- TableSourceScan(table=[[testCatalog, test_database,
dim]], fields=[id, male, amount, price, dim_date_sk])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 400)])
+: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]],
fields=[price, fact_date_sk])(reuse_id=[1])
+: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+: +- TableSourceScan(table=[[testCatalog, test_database,
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 100)])
-: : +- Reused(reference_id=[1])
-: +- Reused(reference_id=[2])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 100)])
+: : +- Reused(reference_id=[1])
+: +- Reused(reference_id=[2])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
- :- Exchange(distribution=[hash[fact_date_sk]])
- : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 200)])
- : +- Reused(reference_id=[1])
- +- Reused(reference_id=[2])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+ :- Exchange(distribution=[hash[fact_date_sk]])
+ : +- Calc(select=[fact_date_sk], where=[(price < 200)])
+ : +- Reused(reference_id=[1])
+ +- Reused(reference_id=[2])
]]>
</Resource>
</TestCase>
@@ -315,29 +332,32 @@ MultipleInput(readOrder=[0,0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin],
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 400)])
-: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[]]], fields=[id, name, amount, price,
fact_date_sk])(reuse_id=[1])
-: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
-: +- TableSourceScan(table=[[testCatalog, test_database,
dim]], fields=[id, male, amount, price, dim_date_sk])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 400)])
+: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]],
fields=[price, fact_date_sk])(reuse_id=[1])
+: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+: +- TableSourceScan(table=[[testCatalog, test_database,
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 100)])
-: : +- Reused(reference_id=[1])
-: +- Reused(reference_id=[2])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 100)])
+: : +- Reused(reference_id=[1])
+: +- Reused(reference_id=[2])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
- :- Exchange(distribution=[hash[fact_date_sk]])
- : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 200)])
- : +- Reused(reference_id=[1])
- +- Reused(reference_id=[2])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+ :- Exchange(distribution=[hash[fact_date_sk]])
+ : +- Calc(select=[fact_date_sk], where=[(price < 200)])
+ : +- Reused(reference_id=[1])
+ +- Reused(reference_id=[2])
]]>
</Resource>
</TestCase>
@@ -374,29 +394,32 @@ MultipleInput(readOrder=[0,0,1],
members=[\nNestedLoopJoin(joinType=[InnerJoin],
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 400)])
-: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[]]], fields=[id, name, amount, price,
fact_date_sk])(reuse_id=[1])
-: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
-: +- TableSourceScan(table=[[testCatalog, test_database,
dim]], fields=[id, male, amount, price, dim_date_sk])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 400)])
+: : +- TableSourceScan(table=[[testCatalog, test_database,
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]],
fields=[price, fact_date_sk])(reuse_id=[1])
+: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+: +- TableSourceScan(table=[[testCatalog, test_database,
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
:- Exchange(distribution=[broadcast])
: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
: +- Exchange(distribution=[single])
: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
-: :- Exchange(distribution=[hash[fact_date_sk]])
-: : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 100)])
-: : +- Reused(reference_id=[1])
-: +- Reused(reference_id=[2])
+: +- Calc(select=[0 AS $f0])
+: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+: :- Exchange(distribution=[hash[fact_date_sk]])
+: : +- Calc(select=[fact_date_sk], where=[(price < 100)])
+: : +- Reused(reference_id=[1])
+: +- Reused(reference_id=[2])
+- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
- +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male,
amount0, price0, dim_date_sk], build=[left])
- :- Exchange(distribution=[hash[fact_date_sk]])
- : +- Calc(select=[id, name, amount, price, fact_date_sk],
where=[(price < 200)])
- : +- Reused(reference_id=[1])
- +- Reused(reference_id=[2])
+ +- Calc(select=[0 AS $f0])
+ +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk =
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+ :- Exchange(distribution=[hash[fact_date_sk]])
+ : +- Calc(select=[fact_date_sk], where=[(price < 200)])
+ : +- Reused(reference_id=[1])
+ +- Reused(reference_id=[2])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index 55fa4f14e80..7852555acf8 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -167,7 +167,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, inventory,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0])
+ +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
]]>
</Resource>
</TestCase>
@@ -185,7 +185,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, inventory,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0])
+ +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
]]>
</Resource>
</TestCase>
@@ -203,7 +203,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, inventory,
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]],
fields=[count1$0])
+ +- TableSourceScan(table=[[default_catalog, default_database, inventory,
project=[id], metadata=[], aggregates=[grouping=[],
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 808998628b1..adf7dd0d612 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -31,8 +31,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
+- Exchange(distribution=[single], changelogMode=[I,UB,UA])
- +- Calc(select=[ts, a, b], where=[>(a, 1)], changelogMode=[I,UB,UA])
- +- TableSourceScan(table=[[default_catalog, default_database, src,
filter=[]]], fields=[ts, a, b], changelogMode=[I,UB,UA])
+ +- Calc(select=[a], where=[>(a, 1)], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
filter=[], project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 8b53de0c16c..2fad4c62450 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -141,7 +141,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, T]],
fields=[id, name])
+ +- TableSourceScan(table=[[default_catalog, default_database, T,
project=[id], metadata=[]]], fields=[id])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 5ee567631bd..12627a088e2 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -263,7 +263,29 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, src]],
fields=[id, cnt])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCountStartWithHaving">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*) FROM src HAVING COUNT(*) > 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalFilter(condition=[>($0, 1)])
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0], where=[(EXPR$0 > 1)])
++- GroupAggregate(select=[COUNT(*) AS EXPR$0])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id])
]]>
</Resource>
</TestCase>
@@ -282,7 +304,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, src,
metadata=[sys_col]]], fields=[id, cnt, sys_col])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[id], metadata=[]]], fields=[id])
]]>
</Resource>
</TestCase>
@@ -301,7 +324,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, src,
metadata=[cnt, id, sys_col]]], fields=[cnt, id, sys_col])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[], metadata=[cnt]]], fields=[cnt])
]]>
</Resource>
</TestCase>
@@ -320,7 +344,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
<![CDATA[
GroupAggregate(select=[COUNT(*) AS EXPR$0])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database, src,
metadata=[sys_col]]], fields=[nested, id, cnt, sys_col])
+ +- Calc(select=[0 AS $f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, src,
project=[nested], metadata=[]]], fields=[nested])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
index cc42ab3cc40..405b7763a5d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -692,7 +692,8 @@ Calc(select=[b])
: : +- Exchange(distribution=[single])
: : +- GroupAggregate(select=[COUNT(*) AS c])
: : +- Exchange(distribution=[single])
- : : +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[i, j, k])(reuse_id=[1])
+ : : +- Calc(select=[0 AS $f0])
+ : : +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[i, j, k])(reuse_id=[1])
: +- Exchange(distribution=[single])
: +- Calc(select=[true AS i])
: +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
@@ -1234,7 +1235,8 @@ Calc(select=[b])
: : +- Exchange(distribution=[single])
: : +- GroupAggregate(select=[COUNT(*) AS c])
: : +- Exchange(distribution=[single])
- : : +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[i, j, k])(reuse_id=[1])
+ : : +- Calc(select=[0 AS $f0])
+ : : +- TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[i, j, k])(reuse_id=[1])
: +- Exchange(distribution=[single])
: +- Calc(select=[true AS i])
: +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index d34a7c36b84..9f8d430a618 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -62,6 +62,17 @@ abstract class AggregateTestBase extends TableTestBase {
)
util.addTableSource[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
+ // the test values table source supports projection push down by default
+ util.tableEnv.executeSql("""
+ |CREATE TABLE src (
+ | id VARCHAR,
+ | cnt BIGINT
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'bounded' = 'true'
+ |)
+ |""".stripMargin)
+
@TestTemplate
def testAvg(): Unit = {
util.verifyRelPlanWithType("""
@@ -119,19 +130,18 @@ abstract class AggregateTestBase extends TableTestBase {
@TestTemplate
def testCountStartWithProjectPushDown(): Unit = {
- // the test values table source supports projection push down by default
- util.tableEnv.executeSql("""
- |CREATE TABLE src (
- | id VARCHAR,
- | cnt BIGINT
- |) WITH (
- | 'connector' = 'values'
- | ,'bounded' = 'true'
- |)
- |""".stripMargin)
util.verifyRelPlanWithType("SELECT COUNT(*) FROM src")
}
+ @TestTemplate
+ def testCountStarWithHavingAndProjectPushDown(): Unit = {
+ val sql =
+ """
+ |SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+ """.stripMargin
+ util.verifyRelPlanWithType(sql)
+ }
+
@TestTemplate
def testCannotCountOnMultiFields(): Unit = {
val sql = "SELECT b, COUNT(a, c) FROM MyTable1 GROUP BY b"
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index 59632a99ff7..f3a92c91600 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -485,4 +485,17 @@ class AggregateTest extends TableTestBase {
|""".stripMargin)
util.verifyExecPlan("SELECT COUNT(*) FROM src")
}
+
+ @Test
+ def testCountStartWithHaving(): Unit = {
+ util.tableEnv.executeSql("""
+ |CREATE TABLE src (
+ | id VARCHAR,
+ | cnt BIGINT
+ |) WITH (
+ | 'connector' = 'values'
+ |)
+ |""".stripMargin)
+ util.verifyExecPlan("SELECT COUNT(*) FROM src HAVING COUNT(*) > 1")
+ }
}