This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new abf53fb7a7b [FLINK-39221][table] Add AGGREGATE_VALUES rule for
constant folding of global aggregates
abf53fb7a7b is described below
commit abf53fb7a7b65c227416c0401ac3ca54c47bc64d
Author: Fabian Paul <[email protected]>
AuthorDate: Fri Mar 6 11:25:59 2026 +0100
[FLINK-39221][table] Add AGGREGATE_VALUES rule for constant folding of
global aggregates
Add CoreRules.AGGREGATE_VALUES to PRUNE_EMPTY_RULES so the planner can
replace a global aggregate over statically empty input (e.g. WHERE 1=0)
with literal default Values at plan time (COUNT(*)=0, SUM=null, etc.),
eliminating the GroupAggregate node entirely.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
.../planner/plan/rules/FlinkBatchRuleSets.scala | 6 ++-
.../planner/plan/rules/FlinkStreamRuleSets.scala | 6 ++-
.../plan/batch/sql/agg/HashAggregateTest.xml | 57 ++++++++++++++++++++++
.../plan/batch/sql/agg/SortAggregateTest.xml | 57 ++++++++++++++++++++++
.../planner/plan/stream/sql/agg/AggregateTest.xml | 19 ++++++++
.../plan/batch/sql/agg/AggregateTestBase.scala | 8 +++
.../plan/stream/sql/agg/AggregateTest.scala | 8 +++
7 files changed, 159 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index ab401981684..6445f67f32f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -189,7 +189,11 @@ object FlinkBatchRuleSets {
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
- PruneEmptyRules.JOIN_RIGHT_INSTANCE
+ PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+ // Replaces global Aggregate over empty Values with default literal values
+ // (e.g. COUNT(*)=0). Handles the plan-time case where the planner can
+ // statically determine the input is empty.
+ CoreRules.AGGREGATE_VALUES
)
/** RuleSet about project */
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index d8307013965..6e56aadb0d9 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -193,7 +193,11 @@ object FlinkStreamRuleSets {
PruneEmptyRules.SORT_INSTANCE,
PruneEmptyRules.AGGREGATE_INSTANCE,
PruneEmptyRules.JOIN_LEFT_INSTANCE,
- PruneEmptyRules.JOIN_RIGHT_INSTANCE
+ PruneEmptyRules.JOIN_RIGHT_INSTANCE,
+ // Replaces global Aggregate over empty Values with default literal values
+ // (e.g. COUNT(*)=0). Handles the plan-time case where the planner can
+ // statically determine the input is empty.
+ CoreRules.AGGREGATE_VALUES
)
/** RuleSet about project */
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
index 0c35071320c..38ca2c88f5d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/HashAggregateTest.xml
@@ -695,6 +695,63 @@ HashAggregate(isMerge=[true], select=[Final_COUNT(count$0)
AS EXPR$0, Final_COUN
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count$0, BIGINT
count$1, BIGINT count$2, BIGINT count$3, BIGINT count$4, BIGINT count$5, BIGINT
count$6, BIGINT count$7, BIGINT count$8, BIGINT count$9, BIGINT count$10,
BIGINT count$11, BIGINT count$12)]
+- LocalHashAggregate(select=[Partial_COUNT(byte) AS count$0,
Partial_COUNT(short) AS count$1, Partial_COUNT(int) AS count$2,
Partial_COUNT(long) AS count$3, Partial_COUNT(float) AS count$4,
Partial_COUNT(double) AS count$5, Partial_COUNT(decimal3020) AS count$6,
Partial_COUNT(decimal105) AS count$7, Partial_COUNT(boolean) AS count$8,
Partial_COUNT(date) AS count$9, Partial_COUNT(time) AS count$10,
Partial_COUNT(timestamp) AS count$11, Partial_COUNT(string) AS count$12]),
rowType=[Rec [...]
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE
1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE
1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE
1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
index 7638f23d247..2c2fe00b538 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/SortAggregateTest.xml
@@ -818,6 +818,63 @@ SortAggregate(isMerge=[true], select=[Final_COUNT(count$0)
AS EXPR$0, Final_COUN
+- Exchange(distribution=[single]), rowType=[RecordType(BIGINT count$0, BIGINT
count$1, BIGINT count$2, BIGINT count$3, BIGINT count$4, BIGINT count$5, BIGINT
count$6, BIGINT count$7, BIGINT count$8, BIGINT count$9, BIGINT count$10,
BIGINT count$11, BIGINT count$12)]
+- LocalSortAggregate(select=[Partial_COUNT(byte) AS count$0,
Partial_COUNT(short) AS count$1, Partial_COUNT(int) AS count$2,
Partial_COUNT(long) AS count$3, Partial_COUNT(float) AS count$4,
Partial_COUNT(double) AS count$5, Partial_COUNT(decimal3020) AS count$6,
Partial_COUNT(decimal105) AS count$7, Partial_COUNT(boolean) AS count$8,
Partial_COUNT(date) AS count$9, Partial_COUNT(time) AS count$10,
Partial_COUNT(timestamp) AS count$11, Partial_COUNT(string) AS count$12]),
rowType=[Rec [...]
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[byte, short, int, long, float, double, boolean, string, date, time,
timestamp, decimal3020, decimal105]), rowType=[RecordType(TINYINT byte,
SMALLINT short, INTEGER int, BIGINT long, FLOAT float, DOUBLE double, BOOLEAN
boolean, VARCHAR(2147483647) string, DATE date, TIME(0) time, TIMESTAMP(3)
timestamp, DECIMAL(30, 20) decimal3020, DECIMAL(10, 5) decimal105)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=AUTO]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE
1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE
1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testGlobalAggOverEmptyInputReplacedByValues[aggStrategy=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable WHERE
1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(int=[$2])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]], values=[EXPR$0, EXPR$1, $f2, $f3])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 12627a088e2..92fccedf388 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -380,6 +380,25 @@ GroupAggregate(select=[SUM_RETRACT(uv) FILTER $f1 AS
all_uv])
+- Exchange(distribution=[hash[c]])
+- Calc(select=[c])
+- TableSourceScan(table=[[default_catalog, default_database,
T]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGlobalAggOverEmptyInputReplacedByValues">
+ <Resource name="sql">
+ <![CDATA[SELECT COUNT(*), SUM(a), AVG(a) FROM T WHERE 1=0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()], EXPR$1=[SUM($0)],
EXPR$2=[AVG($0)])
++- LogicalProject(a=[$0])
+ +- LogicalFilter(condition=[=(1, 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0, CASE(($f2 = 0), null:INTEGER, EXPR$1) AS EXPR$1,
CAST(($f3 / $f2) AS INTEGER) AS EXPR$2])
++- Values(tuples=[[{ 0, 0, 0, null }]])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
index 9f8d430a618..a060a273e87 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateTestBase.scala
@@ -251,5 +251,13 @@ abstract class AggregateTestBase extends TableTestBase {
util.verifyExecPlan(sql)
}
+ @TestTemplate
+ def testGlobalAggOverEmptyInputReplacedByValues(): Unit = {
+ // When the planner can statically determine the input is empty (WHERE
1=0),
+ // AGGREGATE_VALUES rule should replace the global aggregate with literal
defaults
+ // (e.g. COUNT(*)=0, SUM=null) and remove the aggregate node entirely.
+ util.verifyExecPlan("SELECT COUNT(*), SUM(`int`), AVG(`int`) FROM MyTable
WHERE 1=0")
+ }
+
// TODO supports group sets
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index f3a92c91600..7ee7a72223c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -486,6 +486,14 @@ class AggregateTest extends TableTestBase {
util.verifyExecPlan("SELECT COUNT(*) FROM src")
}
+ @Test
+ def testGlobalAggOverEmptyInputReplacedByValues(): Unit = {
+ // When the planner can statically determine the input is empty (WHERE
1=0),
+ // AGGREGATE_VALUES rule should replace the global aggregate with literal
defaults
+ // (e.g. COUNT(*)=0, SUM=null) and remove the aggregate node entirely.
+ util.verifyExecPlan("SELECT COUNT(*), SUM(a), AVG(a) FROM T WHERE 1=0")
+ }
+
@Test
def testCountStartWithHaving(): Unit = {
util.tableEnv.executeSql("""