This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new abf53fb7a7b [FLINK-39221][table] Add AGGREGATE_VALUES rule for 
constant folding of global aggregates
abf53fb7a7b is described below

commit abf53fb7a7b65c227416c0401ac3ca54c47bc64d
Author: Fabian Paul <[email protected]>
AuthorDate: Fri Mar 6 11:25:59 2026 +0100

    [FLINK-39221][table] Add AGGREGATE_VALUES rule for constant folding of 
global aggregates
    
    Add CoreRules.AGGREGATE_VALUES to PRUNE_EMPTY_RULES so the planner can
    replace a global aggregate over statically empty input (e.g. WHERE 1=0)
    with literal default Values at plan time (COUNT(*)=0, SUM=null, etc.),
    eliminating the GroupAggregate node entirely.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  6 ++-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  6 ++-
 .../plan/batch/sql/agg/HashAggregateTest.xml       | 57 ++++++++++++++++++++++
 .../plan/batch/sql/agg/SortAggregateTest.xml       | 57 ++++++++++++++++++++++
 .../planner/plan/stream/sql/agg/AggregateTest.xml  | 19 ++++++++
 .../plan/batch/sql/agg/AggregateTestBase.scala     |  8 +++
 .../plan/stream/sql/agg/AggregateTest.scala        |  8 +++
 7 files changed, 159 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index ab401981684..6445f67f32f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -189,7 +189,11 @@ object FlinkBatchRuleSets {
     PruneEmptyRules.SORT_INSTANCE,
     PruneEmptyRules.AGGREGATE_INSTANCE,
     PruneEmptyRules.JOIN_LEFT_INSTANCE,
-    PruneEmptyRules.JOIN_RIGHT_INSTANCE
+    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+    // Replaces global Aggregate over empty Values with default literal values
+    // (e.g. COUNT(*)=0). Handles the plan-time case where the planner can
+    // statically determine the input is empty.
+    CoreRules.AGGREGATE_VALUES
   )
 
   /** RuleSet about project */
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d8307013965..6e56aadb0d9 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -193,7 +193,11 @@ object FlinkStreamRuleSets {
     PruneEmptyRules.SORT_INSTANCE,
     PruneEmptyRules.AGGREGATE_INSTANCE,
     PruneEmptyRules.JOIN_LEFT_INSTANCE,
-    PruneEmptyRules.JOIN_RIGHT_INSTANCE
+    PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+    // Replaces global Aggregate over empty Values with default literal values
+    // (e.g. COUNT(*)=0). Handles the plan-time case where the planner can
+    // statically determine the input is empty.
+    CoreRules.AGGREGATE_VALUES
   )
 
   /** RuleSet about project */
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index 0c35071320c..38ca2c88f5d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -695,6 +695,63 @@ HashAggregate(isMerge=[true], select=[Final_COUNT(count$0) 
AS EXPR$0, Final_COUN
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count$0, BIGINT 
count$1, BIGINT count$2, BIGINT count$3, BIGINT count$4, BIGINT count$5, BIGINT 
count$6, BIGINT count$7, BIGINT count$8, BIGINT count$9, BIGINT count$10, 
BIGINT count$11, BIGINT count$12)]
    +- LocalHashAggregate(select=[Partial_COUNT(byte) AS count$0, 
Partial_COUNT(short) AS count$1, Partial_COUNT(int) AS count$2, 
Partial_COUNT(long) AS count$3, Partial_COUNT(float) AS count$4, 
Partial_COUNT(double) AS count$5, Partial_COUNT(decimal3020) AS count$6, 
Partial_COUNT(decimal105) AS count$7, Partial_COUNT(boolean) AS count$8, 
Partial_COUNT(date) AS count$9, Partial_COUNT(time) AS count$10, 
Partial_COUNT(timestamp) AS count$11, Partial_COUNT(string) AS count$12]), 
rowType=[Rec [...]
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE 
1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE 
1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE 
1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 7638f23d247..2c2fe00b538 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -818,6 +818,63 @@ SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) 
AS EXPR$0, Final_COUN
 +- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count$0, BIGINT 
count$1, BIGINT count$2, BIGINT count$3, BIGINT count$4, BIGINT count$5, BIGINT 
count$6, BIGINT count$7, BIGINT count$8, BIGINT count$9, BIGINT count$10, 
BIGINT count$11, BIGINT count$12)]
    +- LocalSortAggregate(select=[Partial_COUNT(byte) AS count$0, 
Partial_COUNT(short) AS count$1, Partial_COUNT(int) AS count$2, 
Partial_COUNT(long) AS count$3, Partial_COUNT(float) AS count$4, 
Partial_COUNT(double) AS count$5, Partial_COUNT(decimal3020) AS count$6, 
Partial_COUNT(decimal105) AS count$7, Partial_COUNT(boolean) AS count$8, 
Partial_COUNT(date) AS count$9, Partial_COUNT(time) AS count$10, 
Partial_COUNT(timestamp) AS count$11, Partial_COUNT(string) AS count$12]), 
rowType=[Rec [...]
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[byte, short, int, long, float, double, boolean, string, date, time, 
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte, 
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN 
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3) 
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE 
1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE 
1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE 
1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 12627a088e2..92fccedf388 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -380,6 +380,25 @@ GroupAggregate(select=[SUM_RETRACT(uv) FILTER $f1 AS 
all_uv])
          +- Exchange(distribution=[hash[c]])
             +- Calc(select=[c])
                +- TableSourceScan(table=[[default_catalog, default_database, 
T]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGlobalAggOverEmptyInputReplacedByValues">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(*), SUM(a), AVG(a) FROM T WHERE 1=0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)], 
EXPR$2=[AVG($0)])
++- LogicalProject(a=[$0])
+   +- LogicalFilter(condition=[=(1, 0)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1, 
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index 9f8d430a618..a060a273e87 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -251,5 +251,13 @@ abstract class AggregateTestBase extends TableTestBase {
     util.verifyExecPlan(sql)
   }
 
+  @TestTemplate
+  def testGlobalAggOverEmptyInputReplacedByValues(): Unit = {
+    // When the planner can statically determine the input is empty (WHERE 
1=0),
+    // AGGREGATE_VALUES rule should replace the global aggregate with literal 
defaults
+    // (e.g. COUNT(*)=0, SUM=null) and remove the aggregate node entirely.
+    util.verifyExecPlan("SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable 
WHERE 1=0")
+  }
+
   // TODO supports group sets
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index f3a92c91600..7ee7a72223c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -486,6 +486,14 @@ class AggregateTest extends TableTestBase {
     util.verifyExecPlan("SELECT COUNT(*) FROM src")
   }
 
+  @Test
+  def testGlobalAggOverEmptyInputReplacedByValues(): Unit = {
+    // When the planner can statically determine the input is empty (WHERE 
1=0),
+    // AGGREGATE_VALUES rule should replace the global aggregate with literal 
defaults
+    // (e.g. COUNT(*)=0, SUM=null) and remove the aggregate node entirely.
+    util.verifyExecPlan("SELECT COUNT(*), SUM(a), AVG(a) FROM T WHERE 1=0")
+  }
+
   @Test
   def testCountStartWithHaving(): Unit = {
     util.tableEnv.executeSql("""

Reply via email to