This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cfeb2e8bb2 [FLINK-38832][table-planner] Push down single column when 
using count star (#27681)
6cfeb2e8bb2 is described below

commit 6cfeb2e8bb2c66e0b293c538dcf28bf181fedd38
Author: Xuyang <[email protected]>
AuthorDate: Thu Mar 5 15:40:12 2026 +0800

    [FLINK-38832][table-planner] Push down single column when using count star 
(#27681)
---
 .../table/planner/calcite/FlinkRelBuilder.java     |  20 --
 .../rules/logical/PruneCountStarInputRule.java     | 110 +++++++++
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |   4 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   4 +-
 .../planner/plan/batch/sql/DeadlockBreakupTest.xml |   3 +-
 .../plan/batch/sql/MultipleInputCreationTest.xml   |   9 +-
 .../planner/plan/batch/sql/RowLevelDeleteTest.xml  |  34 ++-
 .../planner/plan/batch/sql/RowLevelUpdateTest.xml  |  34 ++-
 .../planner/plan/batch/sql/TableSourceTest.xml     |   2 +-
 .../plan/batch/sql/agg/HashAggregateTest.xml       |  87 ++++++-
 .../plan/batch/sql/agg/SortAggregateTest.xml       |  87 ++++++-
 .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml  |   6 +-
 .../plan/batch/sql/join/SemiAntiJoinTest.xml       |   6 +-
 .../plan/hints/batch/BroadcastJoinHintTest.xml     |  17 +-
 .../plan/hints/batch/NestLoopJoinHintTest.xml      |  17 +-
 .../plan/hints/batch/ShuffleHashJoinHintTest.xml   |  17 +-
 .../plan/hints/batch/ShuffleMergeJoinHintTest.xml  |  17 +-
 .../plan/optimize/ShuffleModePlanOptimizeTest.xml  | 275 +++++++++++----------
 .../PushLocalAggIntoTableSourceScanRuleTest.xml    |   6 +-
 .../planner/plan/stream/sql/TableScanTest.xml      |   4 +-
 .../planner/plan/stream/sql/TableSourceTest.xml    |   2 +-
 .../planner/plan/stream/sql/agg/AggregateTest.xml  |  33 ++-
 .../plan/stream/sql/join/SemiAntiJoinTest.xml      |   6 +-
 .../plan/batch/sql/agg/AggregateTestBase.scala     |  30 ++-
 .../plan/stream/sql/agg/AggregateTest.scala        |  13 +
 25 files changed, 608 insertions(+), 235 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 53107b32daa..9045b7d9e38 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -35,8 +35,6 @@ import org.apache.flink.table.runtime.operators.rank.RankType;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableList;
-
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
@@ -44,7 +42,6 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
 import org.apache.calcite.rel.type.RelDataType;
@@ -52,7 +49,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
@@ -197,12 +193,6 @@ public final class FlinkRelBuilder extends RelBuilder {
             final LogicalAggregate logicalAggregate = (LogicalAggregate) 
relNode;
             if (isTableAggregate(logicalAggregate.getAggCallList())) {
                 relNode = LogicalTableAggregate.create(logicalAggregate);
-            } else if (isCountStarAgg(logicalAggregate)) {
-                final RelNode newAggInput =
-                        
push(logicalAggregate.getInput(0)).project(literal(0)).build();
-                relNode =
-                        logicalAggregate.copy(
-                                logicalAggregate.getTraitSet(), 
ImmutableList.of(newAggInput));
             }
         }
 
@@ -270,14 +260,4 @@ public final class FlinkRelBuilder extends RelBuilder {
                         cluster.getPlanner().getContext());
         return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema);
     }
-
-    private static boolean isCountStarAgg(LogicalAggregate agg) {
-        if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
-            return false;
-        }
-        final AggregateCall call = agg.getAggCallList().get(0);
-        return call.getAggregation().getKind() == SqlKind.COUNT
-                && call.filterArg == -1
-                && call.getArgList().isEmpty();
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PruneCountStarInputRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PruneCountStarInputRule.java
new file mode 100644
index 00000000000..5676301f6c4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PruneCountStarInputRule.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.immutables.value.Value;
+
+import java.util.Collections;
+
+/**
+ * Planner rule that prunes the input columns of a {@link LogicalAggregate} 
representing {@code
+ * COUNT(*)} (i.e. no group keys, a single COUNT aggregate call with no 
arguments and no filter) by
+ * inserting a project of a constant literal {@code 0} between the aggregate 
and its input.
+ *
+ * <p>This avoids reading all columns from the source when only counting rows.
+ *
+ * <p>Before:
+ *
+ * <pre>
+ * LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ * +- SomeInput
+ * </pre>
+ *
+ * <p>After:
+ *
+ * <pre>
+ * LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+ * +- LogicalProject($f0=[0])
+ *    +- SomeInput
+ * </pre>
+ */
[email protected]
+public class PruneCountStarInputRule
+        extends RelRule<PruneCountStarInputRule.PruneCountStarInputRuleConfig> 
{
+
+    public static final PruneCountStarInputRule INSTANCE =
+            
PruneCountStarInputRule.PruneCountStarInputRuleConfig.DEFAULT.toRule();
+
+    protected PruneCountStarInputRule(PruneCountStarInputRuleConfig config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        final LogicalAggregate agg = call.rel(0);
+        final RelNode input = agg.getInput();
+        if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
+            return false;
+        }
+        final AggregateCall aggCall = agg.getAggCallList().get(0);
+        if (aggCall.getAggregation().getKind() != SqlKind.COUNT
+                || aggCall.filterArg != -1
+                || !aggCall.getArgList().isEmpty()) {
+            return false;
+        }
+        // Only rewrite when the input has more than one field. After the 
rewrite, the input
+        // becomes a single-field Project(0), so this condition naturally 
prevents repeated
+        // application even if other rules in the same phase transform or 
remove the inserted
+        // project.
+        return input.getRowType().getFieldCount() > 1;
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        final LogicalAggregate agg = call.rel(0);
+        final RelNode input = agg.getInput();
+
+        final RelBuilder relBuilder = call.builder();
+        final RelNode newInput = 
relBuilder.push(input).project(relBuilder.literal(0)).build();
+        final RelNode newAgg = agg.copy(agg.getTraitSet(), 
Collections.singletonList(newInput));
+        call.transformTo(newAgg);
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface PruneCountStarInputRuleConfig extends RelRule.Config {
+        PruneCountStarInputRule.PruneCountStarInputRuleConfig DEFAULT =
+                
ImmutablePruneCountStarInputRule.PruneCountStarInputRuleConfig.builder()
+                        .operandSupplier(b0 -> 
b0.operand(LogicalAggregate.class).anyInputs())
+                        .description("PruneCountStarInputRule")
+                        .build();
+
+        @Override
+        default PruneCountStarInputRule toRule() {
+            return new PruneCountStarInputRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 546081a9c5e..ab401981684 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -125,7 +125,9 @@ object FlinkBatchRuleSets {
         // vector search rule.
         ConstantVectorSearchCallToCorrelateRule.INSTANCE,
         // Wrap arguments for JSON aggregate functions
-        WrapJsonAggFunctionArgumentsRule.INSTANCE
+        WrapJsonAggFunctionArgumentsRule.INSTANCE,
+        // prune COUNT(*) input to project a constant before aggregation
+        PruneCountStarInputRule.INSTANCE
       )).asJava)
 
   /** RuleSet about filter */
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 15372999cd1..d8307013965 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -133,7 +133,9 @@ object FlinkStreamRuleSets {
           // rewrite constant table function scan to correlate
           JoinTableFunctionScanToCorrelateRule.INSTANCE,
           // Wrap arguments for JSON aggregate functions
-          WrapJsonAggFunctionArgumentsRule.INSTANCE
+          WrapJsonAggFunctionArgumentsRule.INSTANCE,
+          // prune COUNT(*) input to project a constant before aggregation
+          PruneCountStarInputRule.INSTANCE
         )
     ).asJava)
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
index 6a8be634552..464500c844a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
@@ -273,7 +273,8 @@ NestedLoopJoin(joinType=[FullOuterJoin], where=[(cnt <> 
cnt0)], select=[cnt, cnt
 :  +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
cnt])(reuse_id=[1])
 :     +- Exchange(distribution=[single])
 :        +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- TableSourceScan(table=[[default_catalog, default_database, x]], 
fields=[a, b, c])
+:           +- Calc(select=[0 AS $f0])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
x]], fields=[a, b, c])
 +- Exchange(distribution=[single], shuffle_mode=[BATCH])
    +- Calc(select=[cnt], where=[(cnt < 5)])
       +- Reused(reference_id=[1])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
index c555231bd38..fd26e57fb89 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MultipleInputCreationTest.xml
@@ -158,7 +158,8 @@ Calc(select=[a])
    :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
cnt])
    :        +- Exchange(distribution=[single])
    :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
-   :              +- Reused(reference_id=[1])
+   :              +- Calc(select=[0 AS $f0])
+   :                 +- Reused(reference_id=[1])
    :- Exchange(distribution=[broadcast])
    :  +- Calc(select=[a])
    :     +- TableSourceScan(table=[[default_catalog, default_database, x]], 
fields=[a, b, c, nx])
@@ -213,7 +214,8 @@ Calc(select=[a])
    :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
cnt])
    :        +- Exchange(distribution=[single])
    :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
-   :              +- Reused(reference_id=[1])
+   :              +- Calc(select=[0 AS $f0])
+   :                 +- Reused(reference_id=[1])
    :- Exchange(distribution=[broadcast])
    :  +- Calc(select=[a])
    :     +- TableSourceScan(table=[[default_catalog, default_database, x]], 
fields=[a, b, c, nx])
@@ -268,7 +270,8 @@ Calc(select=[a])
    :     +- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
cnt])
    :        +- Exchange(distribution=[single])
    :           +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0])
-   :              +- Reused(reference_id=[1])
+   :              +- Calc(select=[0 AS $f0])
+   :                 +- Reused(reference_id=[1])
    :- Exchange(distribution=[broadcast])
    :  +- Calc(select=[a])
    :     +- TableSourceScan(table=[[default_catalog, default_database, x]], 
fields=[a, b, c, nx])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
index 71310ff839c..2ea03e77d82 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
@@ -382,7 +382,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a, 
b])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.t], fields=[a, b])
@@ -395,7 +396,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a, 
b])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- Reused(reference_id=[1])
+                  +- Calc(select=[0 AS $f0])
+                     +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
 {
@@ -416,6 +418,17 @@ Sink(table=[default_catalog.default_database.t], 
fields=[a, b])
       "ship_strategy" : "FORWARD",
       "side" : "second"
     } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
   }, {
     "id" : ,
     "type" : "HashAggregate[]",
@@ -510,7 +523,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a, 
b])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.t], fields=[a, b])
@@ -522,7 +536,8 @@ Sink(table=[default_catalog.default_database.t], fields=[a, 
b])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- Reused(reference_id=[1])
+                  +- Calc(select=[0 AS $f0])
+                     +- Reused(reference_id=[1])
 
 == Physical Execution Plan ==
 {
@@ -532,6 +547,17 @@ Sink(table=[default_catalog.default_database.t], 
fields=[a, b])
     "pact" : "Data Source",
     "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
     "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
   }, {
     "id" : ,
     "type" : "HashAggregate[]",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
index 85e79a8c0ec..399fcb0dd5e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelUpdateTest.xml
@@ -605,7 +605,8 @@ Sink(table=[default_catalog.default_database.t], 
targetColumns=[[1]], fields=[a,
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], 
fields=[a, b])
@@ -616,7 +617,8 @@ Sink(table=[default_catalog.default_database.t], 
targetColumns=[[1]], fields=[a,
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
 
 == Physical Execution Plan ==
 {
@@ -632,6 +634,17 @@ Sink(table=[default_catalog.default_database.t], 
targetColumns=[[1]], fields=[a,
     "pact" : "Data Source",
     "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])",
     "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
   }, {
     "id" : ,
     "type" : "HashAggregate[]",
@@ -716,7 +729,8 @@ Sink(table=[default_catalog.default_database.t], 
targetColumns=[[1]], fields=[a,
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
 
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.t], targetColumns=[[1]], 
fields=[a, b])
@@ -728,7 +742,8 @@ Sink(table=[default_catalog.default_database.t], 
targetColumns=[[1]], fields=[a,
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])
 
 == Physical Execution Plan ==
 {
@@ -755,6 +770,17 @@ Sink(table=[default_catalog.default_database.t], 
targetColumns=[[1]], fields=[a,
     "pact" : "Data Source",
     "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t1]], fields=[a, b])",
     "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
   }, {
     "id" : ,
     "type" : "HashAggregate[]",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
index fdc0fea2d7f..988018a2f3f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.xml
@@ -117,7 +117,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
ProjectableTable, aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
ProjectableTable, project=[a], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index bcccb462898..0c35071320c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -410,6 +410,73 @@ HashAggregate(isMerge=[true], select=[Final_AVG(sum$0, 
count$1) AS EXPR$0, Final
    +- LocalHashAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1), 
Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5), 
Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9), 
Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12, 
count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]), 
rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, 
BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
       +- Calc(select=[byte, short, int, long, float, double, decimal3020, 
decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, 
BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, 
DECIMAL(10, 5) decimal105)]
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[byte, short, int, long, float, double, boolean, string, 
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT 
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, 
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, 
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCountStarWithHavingAndProjectPushDown[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT 
EXPR$0)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]]), 
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT 
EXPR$0)]
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
+   +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT 
EXPR$0)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]]), 
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT 
EXPR$0)]
++- HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
+   +- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+      +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+         +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id]), 
rowType=[RecordType(VARCHAR(2147483647) id)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT 
EXPR$0)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]]), 
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT 
EXPR$0)]
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
+   +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
 ]]>
     </Resource>
   </TestCase>
@@ -427,7 +494,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
       <![CDATA[
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
 ]]>
     </Resource>
   </TestCase>
@@ -444,8 +511,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
     <Resource name="optimized rel plan">
       <![CDATA[
 HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(VARCHAR(2147483647) 
id, BIGINT cnt)]
-   +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[id, cnt]), rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+   +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id]), 
rowType=[RecordType(VARCHAR(2147483647) id)]
 ]]>
     </Resource>
   </TestCase>
@@ -463,7 +531,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
       <![CDATA[
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
 ]]>
     </Resource>
   </TestCase>
@@ -482,7 +550,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
    +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+      +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[byte, short, int, long, float, double, boolean, string, 
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT 
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, 
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, 
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
   </TestCase>
@@ -499,8 +568,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
     <Resource name="optimized rel plan">
       <![CDATA[
 HashAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT 
short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, 
VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, 
DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+   +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
   </TestCase>
@@ -519,7 +589,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
    +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+      +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[byte, short, int, long, float, double, boolean, string, 
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT 
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, 
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, 
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 4d52406652c..7638f23d247 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -533,6 +533,73 @@ SortAggregate(isMerge=[true], select=[Final_AVG(sum$0, 
count$1) AS EXPR$0, Final
    +- LocalSortAggregate(select=[Partial_AVG(byte) AS (sum$0, count$1), 
Partial_AVG(short) AS (sum$2, count$3), Partial_AVG(int) AS (sum$4, count$5), 
Partial_AVG(long) AS (sum$6, count$7), Partial_AVG(float) AS (sum$8, count$9), 
Partial_AVG(double) AS (sum$10, count$11), Partial_AVG(decimal3020) AS (sum$12, 
count$13), Partial_AVG(decimal105) AS (sum$14, count$15)]), 
rowType=[RecordType(BIGINT sum$0, BIGINT count$1, BIGINT sum$2, BIGINT count$3, 
BIGINT sum$4, BIGINT count$5, BIGINT sum$6, [...]
       +- Calc(select=[byte, short, int, long, float, double, decimal3020, 
decimal105]), rowType=[RecordType(TINYINT byte, SMALLINT short, INTEGER int, 
BIGINT long, FLOAT float, DOUBLE double, DECIMAL(30, 20) decimal3020, 
DECIMAL(10, 5) decimal105)]
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[byte, short, int, long, float, double, boolean, string, 
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT 
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, 
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, 
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCountStarWithHavingAndProjectPushDown[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT 
EXPR$0)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]]), 
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT 
EXPR$0)]
++- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
+   +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT 
EXPR$0)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]]), 
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT 
EXPR$0)]
++- SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
+   +- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+      +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+         +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id]), 
rowType=[RecordType(VARCHAR(2147483647) id)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testCountStarWithHavingAndProjectPushDown[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)]), rowType=[RecordType(BIGINT EXPR$0)]
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), rowType=[RecordType(BIGINT 
EXPR$0)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]]), 
rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[>(EXPR$0, 1)]), rowType=[RecordType(BIGINT 
EXPR$0)]
++- SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
+   +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
 ]]>
     </Resource>
   </TestCase>
@@ -550,7 +617,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
       <![CDATA[
 SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
 ]]>
     </Resource>
   </TestCase>
@@ -567,8 +634,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
     <Resource name="optimized rel plan">
       <![CDATA[
 SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(VARCHAR(2147483647) 
id, BIGINT cnt)]
-   +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[id, cnt]), rowType=[RecordType(VARCHAR(2147483647) id, BIGINT cnt)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+   +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id]), 
rowType=[RecordType(VARCHAR(2147483647) id)]
 ]]>
     </Resource>
   </TestCase>
@@ -586,7 +654,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
       <![CDATA[
 SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0]), rowType=[RecordType(BIGINT count1$0)]
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
 ]]>
     </Resource>
   </TestCase>
@@ -605,7 +673,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
 SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
    +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+      +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[byte, short, int, long, float, double, boolean, string, 
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT 
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, 
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, 
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
   </TestCase>
@@ -622,8 +691,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
     <Resource name="optimized rel plan">
       <![CDATA[
 SortAggregate(isMerge=[false], select=[COUNT(*) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
-+- Exchange(distribution=[single]), rowType=[RecordType(TINYINT byte, SMALLINT 
short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN boolean, 
VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) timestamp, 
DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
++- Exchange(distribution=[single]), rowType=[RecordType(INTEGER $f0)]
+   +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
   </TestCase>
@@ -642,7 +712,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()]), 
rowType=[RecordType(BIGINT EXPR$
 SortAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0]), 
rowType=[RecordType(BIGINT EXPR$0)]
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count1$0)]
    +- LocalSortAggregate(select=[Partial_COUNT(*) AS count1$0]), 
rowType=[RecordType(BIGINT count1$0)]
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+      +- Calc(select=[0 AS $f0]), rowType=[RecordType(INTEGER $f0)]
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[byte, short, int, long, float, double, boolean, string, 
date, time, timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT 
byte, SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, 
BOOLEAN boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, 
TIMESTAMP(3) timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
index fed6b988a93..ab0df8f4bce 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
@@ -680,7 +680,8 @@ Calc(select=[b])
       +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c])
          +- Exchange(distribution=[single])
             +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-               +- Reused(reference_id=[1])
+               +- Calc(select=[0 AS $f0])
+                  +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -1202,7 +1203,8 @@ Calc(select=[b])
       +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c])
          +- Exchange(distribution=[single])
             +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-               +- Reused(reference_id=[1])
+               +- Calc(select=[0 AS $f0])
+                  +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
index a975f2bd917..f4f00d200aa 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
@@ -699,7 +699,8 @@ Calc(select=[b])
    :           +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) 
AS c])
    :              +- Exchange(distribution=[single])
    :                 +- LocalHashAggregate(select=[Partial_COUNT(*) AS 
count1$0])
-   :                    +- Reused(reference_id=[1])
+   :                    +- Calc(select=[0 AS $f0])
+   :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d, f]])
       +- Calc(select=[d, f])
          +- TableSourceScan(table=[[default_catalog, default_database, r]], 
fields=[d, e, f])
@@ -1243,7 +1244,8 @@ Calc(select=[b])
    :           +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) 
AS c])
    :              +- Exchange(distribution=[single])
    :                 +- LocalHashAggregate(select=[Partial_COUNT(*) AS 
count1$0])
-   :                    +- Reused(reference_id=[1])
+   :                    +- Calc(select=[0 AS $f0])
+   :                       +- Reused(reference_id=[1])
    +- Exchange(distribution=[hash[d]])
       +- Calc(select=[d])
          +- TableSourceScan(table=[[default_catalog, default_database, r]], 
fields=[d, e, f])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
index 4b24a4f7a39..9bf2749be2a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
@@ -1087,11 +1087,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, EXPR$0], buil
    +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
       +- Exchange(distribution=[single])
          +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-            +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, 
b1, a3, b3], build=[left])
-               :- Exchange(distribution=[hash[a1]])
-               :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
-               +- Exchange(distribution=[hash[a3]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+            +- Calc(select=[0 AS $f0])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], 
select=[a1, a3], build=[left])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -1166,8 +1167,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
          :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
          :     +- Exchange(distribution=[single])
          :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         :           +- Calc(select=[a1, b1], where=[=(a1, 1)])
-         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+         :           +- Calc(select=[a1], where=[=(a1, 1)])
+         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1176,7 +1177,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
                         +- LocalHashAggregate(groupBy=[a3], select=[a3])
                            +- Union(all=[true], union=[a3])
                               :- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a1, 1)])
-                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
                               +- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a3, 1)])
                                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3], 
hints=[[[ALIAS options:[T3]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
index d280cd87b0d..ee97a0df7bf 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
@@ -1084,11 +1084,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, EXPR$0], buil
    +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
       +- Exchange(distribution=[single])
          +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-            +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, 
b1, a3, b3], build=[left])
-               :- Exchange(distribution=[hash[a1]])
-               :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
-               +- Exchange(distribution=[hash[a3]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+            +- Calc(select=[0 AS $f0])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], 
select=[a1, a3], build=[left])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -1163,8 +1164,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
          :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
          :     +- Exchange(distribution=[single])
          :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         :           +- Calc(select=[a1, b1], where=[=(a1, 1)])
-         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+         :           +- Calc(select=[a1], where=[=(a1, 1)])
+         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1173,7 +1174,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
                         +- LocalHashAggregate(groupBy=[a3], select=[a3])
                            +- Union(all=[true], union=[a3])
                               :- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a1, 1)])
-                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
                               +- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a3, 1)])
                                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3], 
hints=[[[ALIAS options:[T3]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
index c6f3c923360..2e2911591ea 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
@@ -1108,11 +1108,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, EXPR$0], buil
    +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
       +- Exchange(distribution=[single])
          +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-            +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, 
b1, a3, b3], build=[left])
-               :- Exchange(distribution=[hash[a1]])
-               :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
-               +- Exchange(distribution=[hash[a3]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+            +- Calc(select=[0 AS $f0])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], 
select=[a1, a3], build=[left])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -1187,8 +1188,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
          :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
          :     +- Exchange(distribution=[single])
          :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         :           +- Calc(select=[a1, b1], where=[=(a1, 1)])
-         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+         :           +- Calc(select=[a1], where=[=(a1, 1)])
+         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1197,7 +1198,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
                         +- LocalHashAggregate(groupBy=[a3], select=[a3])
                            +- Union(all=[true], union=[a3])
                               :- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a1, 1)])
-                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
                               +- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a3, 1)])
                                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3], 
hints=[[[ALIAS options:[T3]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
index 0c12624194c..f89c07d64f3 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
@@ -1108,11 +1108,12 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, EXPR$0], buil
    +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
       +- Exchange(distribution=[single])
          +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-            +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, 
b1, a3, b3], build=[left])
-               :- Exchange(distribution=[hash[a1]])
-               :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1]], fields=[a1, b1], hints=[[[ALIAS options:[T1]]]])
-               +- Exchange(distribution=[hash[a3]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3]], fields=[a3, b3], hints=[[[ALIAS options:[T3]]]])
+            +- Calc(select=[0 AS $f0])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], 
select=[a1, a3], build=[left])
+                  :- Exchange(distribution=[hash[a1]])
+                  :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS 
options:[T1]]]])
+                  +- Exchange(distribution=[hash[a3]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS 
options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -1187,8 +1188,8 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
          :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
          :     +- Exchange(distribution=[single])
          :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         :           +- Calc(select=[a1, b1], where=[=(a1, 1)])
-         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+         :           +- Calc(select=[a1], where=[=(a1, 1)])
+         :              +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
          +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
             +- Exchange(distribution=[single])
                +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
@@ -1197,7 +1198,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a1, $f2], build=[
                         +- LocalHashAggregate(groupBy=[a3], select=[a3])
                            +- Union(all=[true], union=[a3])
                               :- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a1, 1)])
-                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1], metadata=[]]], fields=[a1, 
b1], hints=[[[ALIAS options:[T1]]]])
+                              :  +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1], metadata=[]]], fields=[a1], 
hints=[[[ALIAS options:[T1]]]])
                               +- Calc(select=[CAST(1 AS BIGINT) AS a3], 
where=[=(a3, 1)])
                                  +- TableSourceScan(table=[[default_catalog, 
default_database, T3, filter=[], project=[a3], metadata=[]]], fields=[a3], 
hints=[[[ALIAS options:[T3]]]])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
index 9439c7bd25a..8f7d724122e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
@@ -49,39 +49,43 @@ MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin],
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[right])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-:              :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
-:              :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-:              :     :     +- Calc(select=[id, male, amount, price, 
dim_date_sk], where=[(price < 400)])(reuse_id=[1])
-:              :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim]], fields=[id, male, amount, price, 
dim_date_sk])(reuse_id=[2])
-:              :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-:              +- Exchange(distribution=[hash[dim_date_sk]])
-:                 +- Reused(reference_id=[1])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+:                 :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
+:                 :     :  +- 
DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :     :     +- Calc(select=[dim_date_sk], where=[(price < 
400)])(reuse_id=[1])
+:                 :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])(reuse_id=[2])
+:                 :     +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Reused(reference_id=[1])
 :- Exchange(distribution=[broadcast])
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 100)])
-:              :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk])
-:              +- Exchange(distribution=[hash[dim_date_sk]])
-:                 +- Reused(reference_id=[2])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Calc(select=[dim_date_sk])
+:                       +- Reused(reference_id=[2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
    +- Exchange(distribution=[single])
       +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[right])
-            :- Exchange(distribution=[hash[fact_date_sk]])
-            :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-            :     :- Exchange(distribution=[broadcast], shuffle_mode=[BATCH])
-            :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-            :     :     +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 200)])(reuse_id=[3])
-            :     :        +- Reused(reference_id=[2])
-            :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-            +- Exchange(distribution=[hash[dim_date_sk]])
-               +- Reused(reference_id=[3])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+               :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
+               :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+               :     :     +- Calc(select=[dim_date_sk], where=[(price < 
200)])(reuse_id=[3])
+               :     :        +- Reused(reference_id=[2])
+               :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part, project=[fact_date_sk], metadata=[]]], 
fields=[fact_date_sk])
+               +- Exchange(distribution=[hash[dim_date_sk]])
+                  +- Reused(reference_id=[3])
 ]]>
     </Resource>
   </TestCase>
@@ -118,39 +122,44 @@ MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin],
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[right])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-:              :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
-:              :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-:              :     :     +- Calc(select=[id, male, amount, price, 
dim_date_sk], where=[(price < 400)])(reuse_id=[1])
-:              :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim]], fields=[id, male, amount, price, 
dim_date_sk])(reuse_id=[2])
-:              :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-:              +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
-:                 +- Reused(reference_id=[1])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+:                 :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
+:                 :     :  +- 
DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :     :     +- Calc(select=[dim_date_sk], where=[(price < 
400)])(reuse_id=[1])
+:                 :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])(reuse_id=[2])
+:                 :     +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+:                    +- Reused(reference_id=[1])
 :- Exchange(distribution=[broadcast])
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 100)])
-:              :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk])
-:              +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
-:                 +- Reused(reference_id=[2])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Calc(select=[dim_date_sk])
+:                       +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                          +- Reused(reference_id=[2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
    +- Exchange(distribution=[single])
       +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[right])
-            :- Exchange(distribution=[hash[fact_date_sk]])
-            :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-            :     :- Exchange(distribution=[broadcast], shuffle_mode=[BATCH])
-            :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-            :     :     +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 200)])(reuse_id=[3])
-            :     :        +- Reused(reference_id=[2])
-            :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-            +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH])
-               +- Reused(reference_id=[3])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+               :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
+               :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+               :     :     +- Calc(select=[dim_date_sk], where=[(price < 
200)])(reuse_id=[3])
+               :     :        +- Reused(reference_id=[2])
+               :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part, project=[fact_date_sk], metadata=[]]], 
fields=[fact_date_sk])
+               +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+                  +- Reused(reference_id=[3])
 ]]>
     </Resource>
   </TestCase>
@@ -187,39 +196,44 @@ MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin],
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[right])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-:              :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
-:              :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-:              :     :     +- Calc(select=[id, male, amount, price, 
dim_date_sk], where=[(price < 400)])(reuse_id=[1])
-:              :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim]], fields=[id, male, amount, price, 
dim_date_sk])(reuse_id=[2])
-:              :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-:              +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
-:                 +- Reused(reference_id=[1])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+:                 :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
+:                 :     :  +- 
DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :     :     +- Calc(select=[dim_date_sk], where=[(price < 
400)])(reuse_id=[1])
+:                 :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])(reuse_id=[2])
+:                 :     +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+:                    +- Reused(reference_id=[1])
 :- Exchange(distribution=[broadcast])
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 100)])
-:              :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[]]], fields=[id, name, amount, price, fact_date_sk])
-:              +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
-:                 +- Reused(reference_id=[2])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Calc(select=[dim_date_sk])
+:                       +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                          +- Reused(reference_id=[2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
    +- Exchange(distribution=[single])
       +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[right])
-            :- Exchange(distribution=[hash[fact_date_sk]])
-            :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, 
fact_part]], fields=[id, name, amount, price, fact_date_sk])\n])
-            :     :- Exchange(distribution=[broadcast], shuffle_mode=[BATCH])
-            :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
-            :     :     +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 200)])(reuse_id=[3])
-            :     :        +- Reused(reference_id=[2])
-            :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part]], fields=[id, name, amount, price, fact_date_sk])
-            +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH])
-               +- Reused(reference_id=[3])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[broadcast], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+               :     :- Exchange(distribution=[broadcast], 
shuffle_mode=[BATCH])
+               :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+               :     :     +- Calc(select=[dim_date_sk], where=[(price < 
200)])(reuse_id=[3])
+               :     :        +- Reused(reference_id=[2])
+               :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part, project=[fact_date_sk], metadata=[]]], 
fields=[fact_date_sk])
+               +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+                  +- Reused(reference_id=[3])
 ]]>
     </Resource>
   </TestCase>
@@ -256,29 +270,32 @@ MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin],
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 400)])
-:              :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[]]], fields=[id, name, amount, price, 
fact_date_sk])(reuse_id=[1])
-:              +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
-:                 +- TableSourceScan(table=[[testCatalog, test_database, 
dim]], fields=[id, male, amount, price, dim_date_sk])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 400)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])(reuse_id=[1])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, 
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
 :- Exchange(distribution=[broadcast])
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 100)])
-:              :     +- Reused(reference_id=[1])
-:              +- Reused(reference_id=[2])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- Reused(reference_id=[1])
+:                 +- Reused(reference_id=[2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
    +- Exchange(distribution=[single])
       +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-            :- Exchange(distribution=[hash[fact_date_sk]])
-            :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 200)])
-            :     +- Reused(reference_id=[1])
-            +- Reused(reference_id=[2])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- Calc(select=[fact_date_sk], where=[(price < 200)])
+               :     +- Reused(reference_id=[1])
+               +- Reused(reference_id=[2])
 ]]>
     </Resource>
   </TestCase>
@@ -315,29 +332,32 @@ MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin],
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 400)])
-:              :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[]]], fields=[id, name, amount, price, 
fact_date_sk])(reuse_id=[1])
-:              +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
-:                 +- TableSourceScan(table=[[testCatalog, test_database, 
dim]], fields=[id, male, amount, price, dim_date_sk])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 400)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])(reuse_id=[1])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, 
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
 :- Exchange(distribution=[broadcast])
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 100)])
-:              :     +- Reused(reference_id=[1])
-:              +- Reused(reference_id=[2])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- Reused(reference_id=[1])
+:                 +- Reused(reference_id=[2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
    +- Exchange(distribution=[single])
       +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-            :- Exchange(distribution=[hash[fact_date_sk]])
-            :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 200)])
-            :     +- Reused(reference_id=[1])
-            +- Reused(reference_id=[2])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- Calc(select=[fact_date_sk], where=[(price < 200)])
+               :     +- Reused(reference_id=[1])
+               +- Reused(reference_id=[2])
 ]]>
     </Resource>
   </TestCase>
@@ -374,29 +394,32 @@ MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin],
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 400)])
-:              :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[]]], fields=[id, name, amount, price, 
fact_date_sk])(reuse_id=[1])
-:              +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
-:                 +- TableSourceScan(table=[[testCatalog, test_database, 
dim]], fields=[id, male, amount, price, dim_date_sk])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 400)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])(reuse_id=[1])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, 
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
 :- Exchange(distribution=[broadcast])
 :  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
 :     +- Exchange(distribution=[single])
 :        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-:           +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-:              :- Exchange(distribution=[hash[fact_date_sk]])
-:              :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 100)])
-:              :     +- Reused(reference_id=[1])
-:              +- Reused(reference_id=[2])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- Reused(reference_id=[1])
+:                 +- Reused(reference_id=[2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
    +- Exchange(distribution=[single])
       +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
-         +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[id, name, amount, price, fact_date_sk, id0, male, 
amount0, price0, dim_date_sk], build=[left])
-            :- Exchange(distribution=[hash[fact_date_sk]])
-            :  +- Calc(select=[id, name, amount, price, fact_date_sk], 
where=[(price < 200)])
-            :     +- Reused(reference_id=[1])
-            +- Reused(reference_id=[2])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- Calc(select=[fact_date_sk], where=[(price < 200)])
+               :     +- Reused(reference_id=[1])
+               +- Reused(reference_id=[2])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index 55fa4f14e80..7852555acf8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -167,7 +167,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0])
+   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
 ]]>
     </Resource>
   </TestCase>
@@ -185,7 +185,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0])
+   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
 ]]>
     </Resource>
   </TestCase>
@@ -203,7 +203,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], 
fields=[count1$0])
+   +- TableSourceScan(table=[[default_catalog, default_database, inventory, 
project=[id], metadata=[], aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 808998628b1..adf7dd0d612 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -31,8 +31,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 GroupAggregate(select=[COUNT_RETRACT(*) AS EXPR$0], changelogMode=[I,UA,D])
 +- Exchange(distribution=[single], changelogMode=[I,UB,UA])
-   +- Calc(select=[ts, a, b], where=[>(a, 1)], changelogMode=[I,UB,UA])
-      +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[ts, a, b], changelogMode=[I,UB,UA])
+   +- Calc(select=[a], where=[>(a, 1)], changelogMode=[I,UB,UA])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[], project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 8b53de0c16c..2fad4c62450 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -141,7 +141,7 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[id, name])
+   +- TableSourceScan(table=[[default_catalog, default_database, T, 
project=[id], metadata=[]]], fields=[id])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 5ee567631bd..12627a088e2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -263,7 +263,29 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[id, cnt])
+   +- Calc(select=[0 AS $f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCountStartWithHaving">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*) FROM src HAVING COUNT(*) > 1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalFilter(condition=[>($0, 1)])
++- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0], where=[(EXPR$0 > 1)])
++- GroupAggregate(select=[COUNT(*) AS EXPR$0])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[0 AS $f0])
+         +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id])
 ]]>
     </Resource>
   </TestCase>
@@ -282,7 +304,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
metadata=[sys_col]]], fields=[id, cnt, sys_col])
+   +- Calc(select=[0 AS $f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id], metadata=[]]], fields=[id])
 ]]>
     </Resource>
   </TestCase>
@@ -301,7 +324,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
metadata=[cnt, id, sys_col]]], fields=[cnt, id, sys_col])
+   +- Calc(select=[0 AS $f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[], metadata=[cnt]]], fields=[cnt])
 ]]>
     </Resource>
   </TestCase>
@@ -320,7 +344,8 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
       <![CDATA[
 GroupAggregate(select=[COUNT(*) AS EXPR$0])
 +- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, src, 
metadata=[sys_col]]], fields=[nested, id, cnt, sys_col])
+   +- Calc(select=[0 AS $f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[nested], metadata=[]]], fields=[nested])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
index cc42ab3cc40..405b7763a5d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -692,7 +692,8 @@ Calc(select=[b])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c])
    :        :           +- Exchange(distribution=[single])
-   :        :              +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[i, j, k])(reuse_id=[1])
+   :        :              +- Calc(select=[0 AS $f0])
+   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[i, j, k])(reuse_id=[1])
    :        +- Exchange(distribution=[single])
    :           +- Calc(select=[true AS i])
    :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
@@ -1234,7 +1235,8 @@ Calc(select=[b])
    :        :     +- Exchange(distribution=[single])
    :        :        +- GroupAggregate(select=[COUNT(*) AS c])
    :        :           +- Exchange(distribution=[single])
-   :        :              +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[i, j, k])(reuse_id=[1])
+   :        :              +- Calc(select=[0 AS $f0])
+   :        :                 +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[i, j, k])(reuse_id=[1])
    :        +- Exchange(distribution=[single])
    :           +- Calc(select=[true AS i])
    :              +- GroupAggregate(groupBy=[EXPR$0], select=[EXPR$0])
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index d34a7c36b84..9f8d430a618 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -62,6 +62,17 @@ abstract class AggregateTestBase extends TableTestBase {
   )
   util.addTableSource[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
 
+  // the test values table source supports projection push down by default
+  util.tableEnv.executeSql("""
+                             |CREATE TABLE src (
+                             | id VARCHAR,
+                             | cnt BIGINT
+                             |) WITH (
+                             | 'connector' = 'values'
+                             | ,'bounded' = 'true'
+                             |)
+                             |""".stripMargin)
+
   @TestTemplate
   def testAvg(): Unit = {
     util.verifyRelPlanWithType("""
@@ -119,19 +130,18 @@ abstract class AggregateTestBase extends TableTestBase {
 
   @TestTemplate
   def testCountStartWithProjectPushDown(): Unit = {
-    // the test values table source supports projection push down by default
-    util.tableEnv.executeSql("""
-                               |CREATE TABLE src (
-                               | id VARCHAR,
-                               | cnt BIGINT
-                               |) WITH (
-                               | 'connector' = 'values'
-                               | ,'bounded' = 'true'
-                               |)
-                               |""".stripMargin)
     util.verifyRelPlanWithType("SELECT COUNT(*) FROM src")
   }
 
+  @TestTemplate
+  def testCountStarWithHavingAndProjectPushDown(): Unit = {
+    val sql =
+      """
+        |SELECT COUNT(*) FROM src HAVING COUNT(*) > 1
+      """.stripMargin
+    util.verifyRelPlanWithType(sql)
+  }
+
   @TestTemplate
   def testCannotCountOnMultiFields(): Unit = {
     val sql = "SELECT b, COUNT(a, c) FROM MyTable1 GROUP BY b"
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index 59632a99ff7..f3a92c91600 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -485,4 +485,17 @@ class AggregateTest extends TableTestBase {
                                |""".stripMargin)
     util.verifyExecPlan("SELECT COUNT(*) FROM src")
   }
+
+  @Test
+  def testCountStartWithHaving(): Unit = {
+    util.tableEnv.executeSql("""
+                               |CREATE TABLE src (
+                               | id VARCHAR,
+                               | cnt BIGINT
+                               |) WITH (
+                               | 'connector' = 'values'
+                               |)
+                               |""".stripMargin)
+    util.verifyExecPlan("SELECT COUNT(*) FROM src HAVING COUNT(*) > 1")
+  }
 }

Reply via email to