This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba33bb539 [flink] supports agg pushdown with partitions groupby in 
flink (#7828)
2ba33bb539 is described below

commit 2ba33bb5391d8a88415586fe15d42a4092883be4
Author: Faiz <[email protected]>
AuthorDate: Thu May 14 14:15:19 2026 +0800

    [flink] supports agg pushdown with partitions groupby in flink (#7828)
    
    Mirrors spark implementation.
    This PR originates from https://github.com/apache/paimon/pull/7779. In
    our internal case, we find it hard for us to quickly get the real
    records num for data evolution tables, especially for each partition.
    Currently both snapshots system table and partitions system table only
    shows unmerged records num. We could get the accurate values by count(*)
    agg pushdown (probably with group by clause) through flink OLAP queries.
---
 .../paimon/flink/source/BaseDataTableSource.java   |  78 ++---
 .../paimon/flink/source/DataTableSource.java       |  32 +-
 .../paimon/flink/source/StaticRowDataSource.java   | 311 +++++++++++++++++++
 .../source/aggregate/AggregatePushDownUtils.java   | 304 +++++++++++++++++++
 .../flink/source/aggregate/LocalAggregator.java    | 328 +++++++++++++++++++++
 .../source/aggregate/PushedAggregateResult.java    |  47 +++
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 188 +++++++++++-
 .../flink/source/StaticRowDataSourceTest.java      | 248 ++++++++++++++++
 .../FileStoreTableStatisticsTestBase.java          |   3 -
 .../statistics/PrimaryKeyTableStatisticsTest.java  |   1 -
 10 files changed, 1476 insertions(+), 64 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 943c4b582c..a49f87c63f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,6 +28,8 @@ import 
org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
 import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
 import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
 import org.apache.paimon.flink.sink.AdaptiveParallelism;
+import org.apache.paimon.flink.source.aggregate.AggregatePushDownUtils;
+import org.apache.paimon.flink.source.aggregate.PushedAggregateResult;
 import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
@@ -35,10 +37,8 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.BucketSpec;
-import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -67,7 +67,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.OptionalLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -111,7 +110,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
     protected final DynamicTableFactory.Context context;
     @Nullable private BucketShufflePartitioner bucketShufflePartitioner;
     @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
-    @Nullable protected Long countPushed;
+    @Nullable protected PushedAggregateResult pushedAggregateResult;
 
     public BaseDataTableSource(
             ObjectIdentifier tableIdentifier,
@@ -122,7 +121,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
             @Nullable int[][] projectFields,
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
-            @Nullable Long countPushed) {
+            @Nullable PushedAggregateResult pushedAggregateResult) {
         super(table, predicate, projectFields, limit);
 
         this.tableIdentifier = tableIdentifier;
@@ -130,7 +129,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
         this.context = context;
 
         this.watermarkStrategy = watermarkStrategy;
-        this.countPushed = countPushed;
+        this.pushedAggregateResult = pushedAggregateResult;
     }
 
     @Override
@@ -163,8 +162,8 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
-        if (countPushed != null) {
-            return createCountStarScan();
+        if (pushedAggregateResult != null) {
+            return createPushedAggregateScan();
         }
 
         WatermarkStrategy<RowData> watermarkStrategy = this.watermarkStrategy;
@@ -210,9 +209,11 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                 table);
     }
 
-    private ScanRuntimeProvider createCountStarScan() {
-        checkNotNull(countPushed);
-        NumberSequenceRowSource source = new 
NumberSequenceRowSource(countPushed, countPushed);
+    private ScanRuntimeProvider createPushedAggregateScan() {
+        checkNotNull(pushedAggregateResult);
+        StaticRowDataSource source =
+                new StaticRowDataSource(
+                        pushedAggregateResult.rows(), 
pushedAggregateResult.paimonRowType());
         return new SourceProvider() {
             @Override
             public Source<RowData, ?, ?> createSource() {
@@ -333,52 +334,21 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
             return false;
         }
 
-        if (!(table instanceof DataTable)) {
-            return false;
-        }
-
-        if (groupingSets.size() != 1) {
-            return false;
-        }
-
-        if (groupingSets.get(0).length != 0) {
-            return false;
-        }
-
-        if (aggregateExpressions.size() != 1) {
-            return false;
-        }
-
-        if (!aggregateExpressions
-                .get(0)
-                .getFunctionDefinition()
-                .getClass()
-                .getName()
-                .equals(
-                        
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction")) {
+        if (!(table instanceof FileStoreTable)) {
             return false;
         }
 
-        List<Split> splits =
-                table.newReadBuilder()
-                        .dropStats()
-                        .withProjection(new int[0])
-                        .withFilter(predicate)
-                        .withPartitionFilter(partitionPredicate)
-                        .newScan()
-                        .plan()
-                        .splits();
-        long countPushed = 0;
-        for (Split s : splits) {
-            OptionalLong mergedRowCount = s.mergedRowCount();
-            if (!mergedRowCount.isPresent()) {
-                return false;
-            }
-            countPushed += mergedRowCount.getAsLong();
-        }
-
-        this.countPushed = countPushed;
-        return true;
+        Optional<PushedAggregateResult> result =
+                AggregatePushDownUtils.tryPushdownAggregation(
+                        (FileStoreTable) table,
+                        predicate,
+                        partitionPredicate,
+                        projectFields,
+                        groupingSets,
+                        aggregateExpressions,
+                        producedDataType);
+        result.ifPresent(r -> this.pushedAggregateResult = r);
+        return result.isPresent();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index a19e04d6e5..9271f2f5f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.flink.source.aggregate.PushedAggregateResult;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.stats.ColStats;
 import org.apache.paimon.stats.Statistics;
@@ -57,7 +58,30 @@ public class DataTableSource extends BaseDataTableSource
             Table table,
             boolean unbounded,
             DynamicTableFactory.Context context) {
-        this(tableIdentifier, table, unbounded, context, null, null, null, 
null, null, null);
+        this(tableIdentifier, table, unbounded, context, null, null, null, 
null, null);
+    }
+
+    public DataTableSource(
+            ObjectIdentifier tableIdentifier,
+            Table table,
+            boolean unbounded,
+            DynamicTableFactory.Context context,
+            @Nullable Predicate predicate,
+            @Nullable int[][] projectFields,
+            @Nullable Long limit,
+            @Nullable WatermarkStrategy<RowData> watermarkStrategy,
+            @Nullable List<String> dynamicPartitionFilteringFields) {
+        this(
+                tableIdentifier,
+                table,
+                unbounded,
+                context,
+                predicate,
+                projectFields,
+                limit,
+                watermarkStrategy,
+                dynamicPartitionFilteringFields,
+                null);
     }
 
     public DataTableSource(
@@ -70,7 +94,7 @@ public class DataTableSource extends BaseDataTableSource
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
             @Nullable List<String> dynamicPartitionFilteringFields,
-            @Nullable Long countPushed) {
+            @Nullable PushedAggregateResult pushedAggregateResult) {
         super(
                 tableIdentifier,
                 table,
@@ -80,7 +104,7 @@ public class DataTableSource extends BaseDataTableSource
                 projectFields,
                 limit,
                 watermarkStrategy,
-                countPushed);
+                pushedAggregateResult);
         this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
     }
 
@@ -96,7 +120,7 @@ public class DataTableSource extends BaseDataTableSource
                 limit,
                 watermarkStrategy,
                 dynamicPartitionFilteringFields,
-                countPushed);
+                pushedAggregateResult);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticRowDataSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticRowDataSource.java
new file mode 100644
index 0000000000..38506f3eb4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticRowDataSource.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** A bounded source that returns a precomputed list of rows. */
+public class StaticRowDataSource
+        implements Source<
+                        RowData,
+                        StaticRowDataSource.StaticRowsSplit,
+                        Collection<StaticRowDataSource.StaticRowsSplit>>,
+                ResultTypeQueryable<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<InternalRow> rows;
+    private final RowType paimonRowType;
+
+    public StaticRowDataSource(List<InternalRow> rows, RowType paimonRowType) {
+        this.rows = new ArrayList<>(rows);
+        this.paimonRowType = paimonRowType;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return 
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(paimonRowType));
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, StaticRowsSplit> 
createReader(SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<StaticRowsSplit, Collection<StaticRowsSplit>> 
createEnumerator(
+            SplitEnumeratorContext<StaticRowsSplit> enumContext) {
+        List<StaticRowsSplit> splits = splitRows(rows, 
enumContext.currentParallelism());
+        return new IteratorSourceEnumerator<>(enumContext, splits);
+    }
+
+    @Override
+    public SplitEnumerator<StaticRowsSplit, Collection<StaticRowsSplit>> 
restoreEnumerator(
+            SplitEnumeratorContext<StaticRowsSplit> enumContext,
+            Collection<StaticRowsSplit> checkpoint) {
+        return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<StaticRowsSplit> getSplitSerializer() {
+        return new SplitSerializer(paimonRowType);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<StaticRowsSplit>>
+            getEnumeratorCheckpointSerializer() {
+        return new CheckpointSerializer(paimonRowType);
+    }
+
+    private List<StaticRowsSplit> splitRows(List<InternalRow> rows, int 
numSplits) {
+        StaticRowsIterator[] iterators = new StaticRowsIterator(rows, 
0).split(numSplits);
+        List<StaticRowsSplit> splits = new ArrayList<>(iterators.length);
+
+        int splitId = 1;
+        for (StaticRowsIterator iterator : iterators) {
+            if (iterator.hasNext()) {
+                splits.add(new StaticRowsSplit(String.valueOf(splitId++), 
iterator.rows, 0));
+            }
+        }
+        return splits;
+    }
+
+    /** A SourceSplit with pre-computed static rows. */
+    public static class StaticRowsSplit
+            implements IteratorSourceSplit<RowData, StaticRowsIterator> {
+        private final String splitId;
+        private final List<InternalRow> rows;
+        private final int nextIndex;
+
+        public StaticRowsSplit(String splitId, List<InternalRow> rows, int 
nextIndex) {
+            this.splitId = splitId;
+            this.rows = new ArrayList<>(rows);
+            this.nextIndex = nextIndex;
+        }
+
+        @Override
+        public String splitId() {
+            return splitId;
+        }
+
+        @Override
+        public StaticRowsIterator getIterator() {
+            return new StaticRowsIterator(rows, nextIndex);
+        }
+
+        @Override
+        public IteratorSourceSplit<RowData, StaticRowsIterator> 
getUpdatedSplitForIterator(
+                StaticRowsIterator iterator) {
+            return new StaticRowsSplit(splitId, rows, iterator.nextIndex());
+        }
+    }
+
+    /** Iterator for static rows. */
+    private static class StaticRowsIterator extends 
SplittableIterator<RowData> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final List<InternalRow> rows;
+        private int nextIndex;
+
+        StaticRowsIterator(List<InternalRow> rows, int nextIndex) {
+            this.rows = rows;
+            this.nextIndex = nextIndex;
+        }
+
+        int nextIndex() {
+            return nextIndex;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return nextIndex < rows.size();
+        }
+
+        @Override
+        public RowData next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return new FlinkRowData(rows.get(nextIndex++));
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StaticRowsIterator[] split(int numPartitions) {
+            if (numPartitions < 1) {
+                throw new IllegalArgumentException("The number of partitions 
must be at least 1.");
+            }
+
+            int remaining = rows.size() - nextIndex;
+            StaticRowsIterator[] iterators = new 
StaticRowsIterator[numPartitions];
+
+            int start = nextIndex;
+            int baseSize = remaining / numPartitions;
+            int numWithExtra = remaining % numPartitions;
+            for (int i = 0; i < numPartitions; i++) {
+                int splitSize = baseSize + (i < numWithExtra ? 1 : 0);
+                int end = start + splitSize;
+                iterators[i] = new StaticRowsIterator(new 
ArrayList<>(rows.subList(start, end)), 0);
+                start = end;
+            }
+            return iterators;
+        }
+
+        @Override
+        public int getMaximumNumberOfSplits() {
+            return rows.size() - nextIndex;
+        }
+    }
+
+    /** Serializer for StaticRowsSplit. */
+    private static class SplitSerializer implements 
SimpleVersionedSerializer<StaticRowsSplit> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        private final InternalRowSerializer rowSerializer;
+
+        private SplitSerializer(RowType rowType) {
+            this.rowSerializer = new InternalRowSerializer(rowType);
+        }
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(StaticRowsSplit split) throws IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+            out.writeUTF(split.splitId);
+            out.writeInt(split.nextIndex);
+            out.writeInt(split.rows.size());
+            for (InternalRow row : split.rows) {
+                rowSerializer.serialize(row, out);
+            }
+            return baos.toByteArray();
+        }
+
+        @Override
+        public StaticRowsSplit deserialize(int version, byte[] serialized) 
throws IOException {
+            if (version != CURRENT_VERSION) {
+                throw new IOException("Unrecognized version: " + version);
+            }
+            DataInputViewStreamWrapper in =
+                    new DataInputViewStreamWrapper(new 
ByteArrayInputStream(serialized));
+            String splitId = in.readUTF();
+            int nextIndex = in.readInt();
+            int numRows = in.readInt();
+            List<InternalRow> rows = new ArrayList<>(numRows);
+            for (int i = 0; i < numRows; i++) {
+                rows.add(rowSerializer.deserialize(in));
+            }
+            return new StaticRowsSplit(splitId, rows, nextIndex);
+        }
+    }
+
+    /** Serializer for multiple splits. */
+    private static class CheckpointSerializer
+            implements SimpleVersionedSerializer<Collection<StaticRowsSplit>> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        private final SplitSerializer splitSerializer;
+
+        private CheckpointSerializer(RowType rowType) {
+            this.splitSerializer = new SplitSerializer(rowType);
+        }
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(Collection<StaticRowsSplit> checkpoint) throws 
IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+            out.writeInt(checkpoint.size());
+            for (StaticRowsSplit split : checkpoint) {
+                byte[] bytes = splitSerializer.serialize(split);
+                out.writeInt(bytes.length);
+                out.write(bytes);
+            }
+            return baos.toByteArray();
+        }
+
+        @Override
+        public Collection<StaticRowsSplit> deserialize(int version, byte[] 
serialized)
+                throws IOException {
+            if (version != CURRENT_VERSION) {
+                throw new IOException("Unrecognized version: " + version);
+            }
+            DataInputViewStreamWrapper in =
+                    new DataInputViewStreamWrapper(new 
ByteArrayInputStream(serialized));
+            int numSplits = in.readInt();
+            List<StaticRowsSplit> splits = new ArrayList<>(numSplits);
+            for (int i = 0; i < numSplits; i++) {
+                byte[] bytes = new byte[in.readInt()];
+                in.readFully(bytes);
+                splits.add(splitSerializer.deserialize(CURRENT_VERSION, 
bytes));
+            }
+            return splits;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/AggregatePushDownUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/AggregatePushDownUtils.java
new file mode 100644
index 0000000000..e2862729d1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/AggregatePushDownUtils.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.aggregate;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Projection;
+
+import org.apache.flink.table.expressions.AggregateExpression;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.table.source.PushDownUtils.minmaxAvailable;
+
+/**
+ * Utilities for aggregate push down.
+ *
+ * <p>This class references Spark's implementation: {@code
+ * org.apache.paimon.spark.aggregate.AggregatePushDownUtils} and related 
classes.
+ */
+public class AggregatePushDownUtils {
+
+    public static Optional<PushedAggregateResult> tryPushdownAggregation(
+            FileStoreTable table,
+            @Nullable Predicate predicate,
+            @Nullable PartitionPredicate partitionPredicate,
+            @Nullable int[][] projectFields,
+            List<int[]> groupingSets,
+            List<AggregateExpression> aggregateExpressions,
+            org.apache.flink.table.types.DataType producedDataType) {
+        if (groupingSets.size() != 1) {
+            return Optional.empty();
+        }
+
+        // reject nested projection
+        int[] fieldIndexMapping = createFieldIndexMapping(table, 
projectFields);
+        if (fieldIndexMapping == null) {
+            return Optional.empty();
+        }
+
+        // groupingSet contains the index within the projected fields,
+        // so we have to translate grouping fields index to original field 
index
+        int[] originalGrouping = translateFieldIndexes(groupingSets.get(0), 
fieldIndexMapping);
+        if (originalGrouping == null) {
+            return Optional.empty();
+        }
+
+        if (originalGrouping.length > 0
+                && !groupingFieldsArePartitionFields(table, originalGrouping)) 
{
+            return Optional.empty();
+        }
+
+        List<LocalAggregator.Aggregate> aggregates =
+                extractAggregates(table, fieldIndexMapping, 
aggregateExpressions);
+        if (aggregates == null) {
+            return Optional.empty();
+        }
+
+        List<DataSplit> dataSplits = planSplits(table, predicate, 
partitionPredicate, aggregates);
+        if (dataSplits == null) {
+            return Optional.empty();
+        }
+
+        LocalAggregator aggregator = new LocalAggregator(table, 
originalGrouping, aggregates);
+        for (DataSplit dataSplit : dataSplits) {
+            aggregator.update(dataSplit);
+        }
+
+        // we should check the result row type equals to producedDataType
+        RowType producedRowType = toPaimonRowType(producedDataType);
+        if (producedRowType == null || 
!isCompatible(aggregator.resultRowType(), producedRowType)) {
+            return Optional.empty();
+        }
+        return Optional.of(new PushedAggregateResult(aggregator.result(), 
producedRowType));
+    }
+
+    @Nullable
+    private static RowType 
toPaimonRowType(org.apache.flink.table.types.DataType producedDataType) {
+        if (!(producedDataType.getLogicalType()
+                instanceof org.apache.flink.table.types.logical.RowType)) {
+            return null;
+        }
+        return LogicalTypeConversion.toDataType(
+                (org.apache.flink.table.types.logical.RowType) 
producedDataType.getLogicalType());
+    }
+
+    private static boolean isCompatible(RowType actualRowType, RowType 
producedRowType) {
+        if (actualRowType.getFieldCount() != producedRowType.getFieldCount()) {
+            return false;
+        }
+
+        for (int i = 0; i < actualRowType.getFieldCount(); i++) {
+            DataType actualType = actualRowType.getTypeAt(i);
+            DataType producedType = producedRowType.getTypeAt(i);
+            if (!actualType.equalsIgnoreNullable(producedType)
+                    || (actualType.isNullable() && 
!producedType.isNullable())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Nullable
+    private static List<DataSplit> planSplits(
+            FileStoreTable table,
+            @Nullable Predicate predicate,
+            @Nullable PartitionPredicate partitionPredicate,
+            List<LocalAggregator.Aggregate> aggregates) {
+        Set<String> minMaxColumns =
+                aggregates.stream()
+                        .filter(LocalAggregator.Aggregate::requiresMinMaxStats)
+                        .map(LocalAggregator.Aggregate::fieldName)
+                        .collect(Collectors.toSet());
+        if (!minMaxColumns.isEmpty()
+                && 
(CoreOptions.fromMap(table.options()).deletionVectorsEnabled()
+                        || !table.primaryKeys().isEmpty())) {
+            return null;
+        }
+
+        ReadBuilder readBuilder =
+                table.newReadBuilder()
+                        .withFilter(predicate)
+                        .withPartitionFilter(partitionPredicate);
+        if (minMaxColumns.isEmpty()) {
+            readBuilder.dropStats();
+        }
+
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        List<DataSplit> dataSplits = new ArrayList<>(splits.size());
+        for (Split split : splits) {
+            if (!(split instanceof DataSplit)) {
+                return null;
+            }
+
+            OptionalLong mergedRowCount = split.mergedRowCount();
+            if (!mergedRowCount.isPresent()) {
+                return null;
+            }
+
+            if (!minMaxColumns.isEmpty() && !minmaxAvailable(split, 
minMaxColumns)) {
+                return null;
+            }
+
+            dataSplits.add((DataSplit) split);
+        }
+        return dataSplits;
+    }
+
+    @Nullable
+    private static List<LocalAggregator.Aggregate> extractAggregates(
+            FileStoreTable table,
+            int[] fieldIndexMapping,
+            List<AggregateExpression> aggregateExpressions) {
+        List<LocalAggregator.Aggregate> aggregates = new 
ArrayList<>(aggregateExpressions.size());
+        for (AggregateExpression aggregateExpression : aggregateExpressions) {
+            if (aggregateExpression.isDistinct()
+                    || aggregateExpression.isApproximate()
+                    || aggregateExpression.getFilterExpression().isPresent()) {
+                return null;
+            }
+
+            String functionName = 
aggregateExpression.getFunctionDefinition().getClass().getName();
+            if (isCountStar(functionName)) {
+                aggregates.add(LocalAggregator.Aggregate.count());
+            } else if (isMin(functionName) || isMax(functionName)) {
+                if (aggregateExpression.getArgs().size() != 1) {
+                    return null;
+                }
+
+                int originalFieldIndex =
+                        toOriginalFieldIndex(
+                                fieldIndexMapping,
+                                
aggregateExpression.getArgs().get(0).getFieldIndex());
+                if (originalFieldIndex < 0) {
+                    return null;
+                }
+
+                DataField field = 
table.rowType().getFields().get(originalFieldIndex);
+                if (!minmaxAvailable(field.type())) {
+                    return null;
+                }
+
+                aggregates.add(
+                        isMin(functionName)
+                                ? 
LocalAggregator.Aggregate.min(originalFieldIndex, field)
+                                : 
LocalAggregator.Aggregate.max(originalFieldIndex, field));
+            } else {
+                return null;
+            }
+        }
+        return aggregates;
+    }
+
+    private static boolean isCountStar(String functionName) {
+        return functionName.equals(
+                
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction");
+    }
+
+    private static boolean isMin(String functionName) {
+        return functionName.startsWith(
+                
"org.apache.flink.table.planner.functions.aggfunctions.MinAggFunction");
+    }
+
+    private static boolean isMax(String functionName) {
+        return functionName.startsWith(
+                
"org.apache.flink.table.planner.functions.aggfunctions.MaxAggFunction");
+    }
+
+    private static boolean groupingFieldsArePartitionFields(
+            FileStoreTable table, int[] originalGrouping) {
+        List<String> tableFieldNames = table.rowType().getFieldNames();
+        List<String> partitionKeys = table.partitionKeys();
+        for (int originalFieldIndex : originalGrouping) {
+            if (originalFieldIndex < 0 || originalFieldIndex >= 
tableFieldNames.size()) {
+                return false;
+            }
+            if 
(!partitionKeys.contains(tableFieldNames.get(originalFieldIndex))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Nullable
+    private static int[] createFieldIndexMapping(
+            FileStoreTable table, @Nullable int[][] projectFields) {
+        int fieldCount = table.rowType().getFieldCount();
+        int[] mapping =
+                projectFields == null
+                        ? Projection.range(0, fieldCount).toTopLevelIndexes()
+                        : toTopLevelIndexes(projectFields);
+        if (mapping == null) {
+            return null;
+        }
+
+        for (int originalFieldIndex : mapping) {
+            if (originalFieldIndex < 0 || originalFieldIndex >= fieldCount) {
+                return null;
+            }
+        }
+        return mapping;
+    }
+
+    @Nullable
+    private static int[] toTopLevelIndexes(int[][] projectFields) {
+        Projection projection = Projection.of(projectFields);
+        if (projection.isNested()) {
+            return null;
+        }
+        return projection.toTopLevelIndexes();
+    }
+
+    @Nullable
+    private static int[] translateFieldIndexes(int[] fieldIndexes, int[] 
fieldIndexMapping) {
+        int[] originalFieldIndexes = new int[fieldIndexes.length];
+        for (int i = 0; i < fieldIndexes.length; i++) {
+            int originalFieldIndex = toOriginalFieldIndex(fieldIndexMapping, 
fieldIndexes[i]);
+            if (originalFieldIndex < 0) {
+                return null;
+            }
+            originalFieldIndexes[i] = originalFieldIndex;
+        }
+        return originalFieldIndexes;
+    }
+
+    private static int toOriginalFieldIndex(int[] fieldIndexMapping, int 
fieldIndex) {
+        if (fieldIndex < 0 || fieldIndex >= fieldIndexMapping.length) {
+            return -1;
+        }
+        return fieldIndexMapping[fieldIndex];
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/LocalAggregator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/LocalAggregator.java
new file mode 100644
index 0000000000..b1a6dc8cf5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/LocalAggregator.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.aggregate;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.predicate.CompareUtils;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ProjectedRow;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Local aggregator based on split statistics. */
+class LocalAggregator {
+
+    private final int[] grouping;
+    private final List<Aggregate> aggregates;
+    private final List<DataType> groupFieldTypes;
+    private final List<String> groupFieldNames;
+    private final ProjectedRow projectedPartition;
+    private final InternalRowSerializer groupSerializer;
+
+    private final Map<BinaryRow, List<AggFuncEvaluator>> groupEvaluatorMap;
+
+    private final SimpleStatsEvolutions simpleStatsEvolutions;
+    private final RowType resultRowType;
+    private final InternalRowSerializer resultSerializer;
+
+    LocalAggregator(FileStoreTable table, int[] grouping, List<Aggregate> 
aggregates) {
+        // grouping has been converted to indices of the original table type
+        this.grouping = grouping;
+        this.aggregates = aggregates;
+        this.groupFieldTypes = new ArrayList<>(grouping.length);
+        this.groupFieldNames = new ArrayList<>(grouping.length);
+
+        List<String> tableFieldNames = table.rowType().getFieldNames();
+        List<String> partitionKeys = table.partitionKeys();
+        RowType partitionType = table.rowType().project(partitionKeys);
+        int[] partitionProjection = new int[grouping.length];
+        for (int i = 0; i < grouping.length; i++) {
+            String fieldName = tableFieldNames.get(grouping[i]);
+            int partitionIndex = partitionKeys.indexOf(fieldName);
+            partitionProjection[i] = partitionIndex;
+            groupFieldTypes.add(partitionType.getTypeAt(partitionIndex));
+            groupFieldNames.add(fieldName);
+        }
+
+        this.projectedPartition = ProjectedRow.from(partitionProjection);
+        this.groupSerializer = new 
InternalRowSerializer(groupFieldTypes.toArray(new DataType[0]));
+        this.groupEvaluatorMap = new LinkedHashMap<>();
+        this.simpleStatsEvolutions =
+                new SimpleStatsEvolutions(
+                        schemaId -> 
table.schemaManager().schema(schemaId).fields(),
+                        table.schema().id());
+        this.resultRowType = createResultRowType();
+        this.resultSerializer = new InternalRowSerializer(resultRowType);
+    }
+
+    void update(DataSplit dataSplit) {
+        BinaryRow groupKey = groupKey(dataSplit.partition());
+        List<AggFuncEvaluator> evaluators = groupEvaluatorMap.get(groupKey);
+        if (evaluators == null) {
+            evaluators = createEvaluators();
+            groupEvaluatorMap.put(groupKey, evaluators);
+        }
+        update(evaluators, dataSplit);
+    }
+
+    List<InternalRow> result() {
+        if (groupEvaluatorMap.isEmpty() && grouping.length == 0) {
+            List<InternalRow> rows = new ArrayList<>(1);
+            rows.add(createResultRow(BinaryRow.EMPTY_ROW, createEvaluators()));
+            return rows;
+        }
+
+        List<InternalRow> rows = new ArrayList<>(groupEvaluatorMap.size());
+        for (Map.Entry<BinaryRow, List<AggFuncEvaluator>> entry : 
groupEvaluatorMap.entrySet()) {
+            rows.add(createResultRow(entry.getKey(), entry.getValue()));
+        }
+        return rows;
+    }
+
+    RowType resultRowType() {
+        return resultRowType;
+    }
+
+    private BinaryRow groupKey(BinaryRow partition) {
+        if (grouping.length == 0) {
+            return BinaryRow.EMPTY_ROW;
+        }
+        return 
groupSerializer.toBinaryRow(projectedPartition.replaceRow(partition)).copy();
+    }
+
+    private void update(List<AggFuncEvaluator> evaluators, DataSplit 
dataSplit) {
+        for (AggFuncEvaluator evaluator : evaluators) {
+            evaluator.update(dataSplit);
+        }
+    }
+
+    private List<AggFuncEvaluator> createEvaluators() {
+        List<AggFuncEvaluator> evaluators = new ArrayList<>(aggregates.size());
+        for (Aggregate aggregate : aggregates) {
+            switch (aggregate.kind) {
+                case COUNT:
+                    evaluators.add(new CountStarEvaluator());
+                    break;
+                case MIN:
+                    evaluators.add(new MinEvaluator(aggregate));
+                    break;
+                case MAX:
+                    evaluators.add(new MaxEvaluator(aggregate));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported aggregate " + aggregate.kind);
+            }
+        }
+        return evaluators;
+    }
+
+    /**
+     * According to {@code SupportsAggregatePushDown}, the produced data type 
will be strictly
+     * organized by grouping keys and aggregate results with the same order as 
input.
+     *
+     * @return the result row
+     */
+    private InternalRow createResultRow(BinaryRow groupKey, 
List<AggFuncEvaluator> evaluators) {
+        GenericRow row = new GenericRow(grouping.length + evaluators.size());
+        for (int i = 0; i < grouping.length; i++) {
+            row.setField(
+                    i,
+                    InternalRow.createFieldGetter(groupFieldTypes.get(i), i)
+                            .getFieldOrNull(groupKey));
+        }
+        for (int i = 0; i < evaluators.size(); i++) {
+            row.setField(grouping.length + i, evaluators.get(i).result());
+        }
+        return resultSerializer.copy(row);
+    }
+
+    private RowType createResultRowType() {
+        List<DataField> fields = new ArrayList<>(groupFieldNames.size() + 
aggregates.size());
+        for (int i = 0; i < groupFieldNames.size(); i++) {
+            fields.add(new DataField(i, groupFieldNames.get(i), 
groupFieldTypes.get(i)));
+        }
+        for (int i = 0; i < aggregates.size(); i++) {
+            Aggregate aggregate = aggregates.get(i);
+            fields.add(
+                    new DataField(
+                            groupFieldNames.size() + i,
+                            aggregate.resultName(),
+                            aggregate.resultType()));
+        }
+        return new RowType(fields);
+    }
+
+    /** Aggregate information. */
+    static class Aggregate {
+
+        private final Kind kind;
+        private final int fieldIndex;
+        private final String fieldName;
+        private final DataField dataField;
+
+        private Aggregate(Kind kind, int fieldIndex, DataField dataField) {
+            this.kind = kind;
+            this.fieldIndex = fieldIndex;
+            this.fieldName = dataField == null ? null : dataField.name();
+            this.dataField = dataField;
+        }
+
+        static Aggregate count() {
+            return new Aggregate(Kind.COUNT, -1, null);
+        }
+
+        static Aggregate min(int fieldIndex, DataField dataField) {
+            return new Aggregate(Kind.MIN, fieldIndex, dataField);
+        }
+
+        static Aggregate max(int fieldIndex, DataField dataField) {
+            return new Aggregate(Kind.MAX, fieldIndex, dataField);
+        }
+
+        boolean requiresMinMaxStats() {
+            return kind == Kind.MIN || kind == Kind.MAX;
+        }
+
+        String fieldName() {
+            return fieldName;
+        }
+
+        private DataType resultType() {
+            switch (kind) {
+                case COUNT:
+                    return DataTypes.BIGINT().notNull();
+                case MIN:
+                case MAX:
+                    return dataField.type();
+                default:
+                    throw new UnsupportedOperationException("Unsupported 
aggregate " + kind);
+            }
+        }
+
+        private String resultName() {
+            return kind.name().toLowerCase();
+        }
+    }
+
+    /** Aggregate Kind. */
+    private enum Kind {
+        COUNT,
+        MIN,
+        MAX
+    }
+
+    /** Evaluator to calculate agg results from file statistics. */
+    private interface AggFuncEvaluator {
+
+        void update(DataSplit dataSplit);
+
+        Object result();
+    }
+
+    /** Evaluator for count star. */
+    private static class CountStarEvaluator implements AggFuncEvaluator {
+
+        private long result;
+
+        @Override
+        public void update(DataSplit dataSplit) {
+            result += dataSplit.mergedRowCount().getAsLong();
+        }
+
+        @Override
+        public Object result() {
+            return result;
+        }
+    }
+
+    /** Evaluator for MIN. */
+    private class MinEvaluator implements AggFuncEvaluator {
+
+        private final Aggregate aggregate;
+        private Object result;
+
+        private MinEvaluator(Aggregate aggregate) {
+            this.aggregate = aggregate;
+        }
+
+        @Override
+        public void update(DataSplit dataSplit) {
+            Object other =
+                    dataSplit.minValue(
+                            aggregate.fieldIndex, aggregate.dataField, 
simpleStatsEvolutions);
+            if (other == null) {
+                return;
+            }
+
+            if (result == null
+                    || CompareUtils.compareLiteral(aggregate.dataField.type(), 
result, other) > 0) {
+                result = other;
+            }
+        }
+
+        @Override
+        public Object result() {
+            return result;
+        }
+    }
+
+    /** Evaluator for MAX. */
+    private class MaxEvaluator implements AggFuncEvaluator {
+
+        private final Aggregate aggregate;
+        private Object result;
+
+        private MaxEvaluator(Aggregate aggregate) {
+            this.aggregate = aggregate;
+        }
+
+        @Override
+        public void update(DataSplit dataSplit) {
+            Object other =
+                    dataSplit.maxValue(
+                            aggregate.fieldIndex, aggregate.dataField, 
simpleStatsEvolutions);
+            if (other == null) {
+                return;
+            }
+
+            if (result == null
+                    || CompareUtils.compareLiteral(aggregate.dataField.type(), 
result, other) < 0) {
+                result = other;
+            }
+        }
+
+        @Override
+        public Object result() {
+            return result;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/PushedAggregateResult.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/PushedAggregateResult.java
new file mode 100644
index 0000000000..6056ccf607
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/aggregate/PushedAggregateResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.aggregate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Result of aggregate push down. */
+public class PushedAggregateResult implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<InternalRow> rows;
+    private final RowType paimonRowType;
+
+    PushedAggregateResult(List<InternalRow> rows, RowType paimonRowType) {
+        this.rows = rows;
+        this.paimonRowType = paimonRowType;
+    }
+
+    public List<InternalRow> rows() {
+        return rows;
+    }
+
+    public RowType paimonRowType() {
+        return paimonRowType;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 48fcbbda71..e80e45b1f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -767,6 +767,178 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         validateCount1PushDown(sql);
     }
 
+    @Test
+    public void testCountStarGroupByPartition() {
+        sql("CREATE TABLE count_group_part (f0 INT, f1 STRING, dt STRING) 
PARTITIONED BY (dt)");
+        sql("INSERT INTO count_group_part VALUES (1, 'a', '1'), (1, 'a', '1'), 
(2, 'b', '2')");
+        String sql = "SELECT dt, COUNT(*) FROM count_group_part GROUP BY dt 
ORDER BY dt";
+
+        assertThat(sql(sql)).containsExactly(Row.of("1", 2L), Row.of("2", 1L));
+        validateCount1PushDown(sql);
+    }
+
+    @Test
+    public void testCountStarGroupByPartitionSubset() {
+        sql(
+                "CREATE TABLE count_group_part_subset ("
+                        + "f0 INT, region STRING, dt STRING) PARTITIONED BY 
(region, dt)");
+        sql(
+                "INSERT INTO count_group_part_subset VALUES "
+                        + "(1, 'cn', '1'), (2, 'cn', '1'), (3, 'cn', '2'), (4, 
'us', '1')");
+        String sql =
+                "SELECT region, COUNT(*) FROM count_group_part_subset "
+                        + "GROUP BY region ORDER BY region";
+
+        assertThat(sql(sql)).containsExactly(Row.of("cn", 3L), Row.of("us", 
1L));
+        validateCount1PushDown(sql);
+    }
+
+    @Test
+    public void testCountStarGroupByNonPartition() {
+        sql("CREATE TABLE count_group_non_part (f0 INT, f1 STRING, dt STRING) 
PARTITIONED BY (dt)");
+        sql(
+                "INSERT INTO count_group_non_part VALUES "
+                        + "(1, 'a', '1'), (1, 'b', '1'), (2, 'c', '2')");
+        String sql = "SELECT f0, COUNT(*) FROM count_group_non_part GROUP BY 
f0 ORDER BY f0";
+
+        assertThat(sql(sql)).containsExactly(Row.of(1, 2L), Row.of(2, 1L));
+        validateCount1NotPushDown(sql);
+    }
+
+    @Test
+    public void testMinMaxAppend() {
+        sql("CREATE TABLE min_max_append (f0 INT, f1 STRING)");
+        sql("INSERT INTO min_max_append VALUES (1, 'a'), (7, 'b'), (3, 'c')");
+
+        String sql = "SELECT MIN(f0), MAX(f0), COUNT(*) FROM min_max_append";
+        assertThat(sql(sql)).containsOnly(Row.of(1, 7, 3L));
+        validateAggregatePushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinMaxWithNullValues() {
+        sql("CREATE TABLE min_max_with_null_values (f0 INT, f1 STRING)");
+        sql(
+                "INSERT INTO min_max_with_null_values VALUES "
+                        + "(CAST(NULL AS INT), 'a'), (7, 'b'), (3, 'c')");
+
+        String sql = "SELECT MIN(f0), MAX(f0), COUNT(*) FROM 
min_max_with_null_values";
+        assertThat(sql(sql)).containsOnly(Row.of(3, 7, 3L));
+        validateAggregatePushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinMaxAllNullValues() {
+        sql("CREATE TABLE min_max_all_null_values (f0 INT, f1 STRING)");
+        sql(
+                "INSERT INTO min_max_all_null_values VALUES "
+                        + "(CAST(NULL AS INT), 'a'), (CAST(NULL AS INT), 
'b')");
+
+        String sql = "SELECT MIN(f0), MAX(f0), COUNT(*) FROM 
min_max_all_null_values";
+        assertThat(sql(sql)).containsOnly(Row.of(null, null, 2L));
+        validateAggregatePushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinMaxEmptyAppend() {
+        sql("CREATE TABLE min_max_empty_append (f0 INT)");
+
+        String sql = "SELECT MIN(f0), MAX(f0), COUNT(*) FROM 
min_max_empty_append";
+        assertThat(sql(sql)).containsOnly(Row.of(null, null, 0L));
+    }
+
+    @Test
+    public void testMinMaxGroupByPartition() {
+        sql("CREATE TABLE min_max_group_part (f0 INT, dt STRING) PARTITIONED 
BY (dt)");
+        sql("INSERT INTO min_max_group_part VALUES " + "(3, '1'), (1, '1'), 
(8, '2'), (5, '2')");
+
+        String sql =
+                "SELECT dt, MIN(f0), MAX(f0), COUNT(*) FROM min_max_group_part 
"
+                        + "GROUP BY dt ORDER BY dt";
+        assertThat(sql(sql)).containsExactly(Row.of("1", 1, 3, 2L), 
Row.of("2", 5, 8, 2L));
+        validateAggregatePushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinMaxGroupByPartitionWithPartitionFilter() {
+        sql("CREATE TABLE min_max_group_part_filter (f0 INT, dt STRING) 
PARTITIONED BY (dt)");
+        sql(
+                "INSERT INTO min_max_group_part_filter VALUES "
+                        + "(3, '1'), (1, '1'), (8, '2'), (5, '2'), (9, '3')");
+
+        String sql =
+                "SELECT dt, MIN(f0), MAX(f0), COUNT(*) FROM 
min_max_group_part_filter "
+                        + "WHERE dt IN ('1', '2') GROUP BY dt ORDER BY dt";
+        assertThat(sql(sql)).containsExactly(Row.of("1", 1, 3, 2L), 
Row.of("2", 5, 8, 2L));
+        validateAggregatePushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinMaxWithProjectedFieldMapping() {
+        sql(
+                "CREATE TABLE min_max_projected_field_mapping ("
+                        + "f0 INT, f1 STRING, f2 INT, dt STRING) PARTITIONED 
BY (dt)");
+        sql(
+                "INSERT INTO min_max_projected_field_mapping VALUES "
+                        + "(3, 'a', 10, '1'), (1, 'b', 20, '1'), "
+                        + "(8, 'c', 5, '2'), (5, 'd', 7, '2')");
+
+        String sql =
+                "SELECT dt, MIN(f2), MAX(f0), COUNT(*) FROM 
min_max_projected_field_mapping "
+                        + "GROUP BY dt ORDER BY dt";
+        assertThat(sql(sql)).containsExactly(Row.of("1", 10, 3, 2L), 
Row.of("2", 5, 8, 2L));
+        validateAggregatePushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinMaxGroupByNonPartition() {
+        sql("CREATE TABLE min_max_group_non_part (f0 INT, f1 INT, dt STRING) 
PARTITIONED BY (dt)");
+        sql(
+                "INSERT INTO min_max_group_non_part VALUES "
+                        + "(1, 3, '1'), (1, 1, '2'), (2, 8, '1'), (2, 5, 
'2')");
+
+        String sql =
+                "SELECT f0, MIN(f1), MAX(f1), COUNT(*) FROM 
min_max_group_non_part "
+                        + "GROUP BY f0 ORDER BY f0";
+        assertThat(sql(sql)).containsExactly(Row.of(1, 1, 3, 2L), Row.of(2, 5, 
8, 2L));
+        validateAggregateNotPushDown(sql, "MinAggFunction", "MaxAggFunction", 
"Count1AggFunction");
+    }
+
+    @Test
+    public void testMinExpressionNotPushDown() {
+        sql("CREATE TABLE min_expression (f0 INT, f1 STRING)");
+        sql("INSERT INTO min_expression VALUES (-3, 'a'), (4, 'b'), (1, 'c')");
+
+        String arithmeticSql = "SELECT MIN(f0 * 2) FROM min_expression";
+        assertThat(sql(arithmeticSql)).containsOnly(Row.of(-6));
+        validateAggregateNotPushDown(arithmeticSql, "MinAggFunction");
+
+        String functionSql = "SELECT MIN(ABS(f0)) FROM min_expression";
+        assertThat(sql(functionSql)).containsOnly(Row.of(1));
+        validateAggregateNotPushDown(functionSql, "MinAggFunction");
+    }
+
+    @Test
+    public void testMinMaxStringNotPushDown() {
+        sql("CREATE TABLE min_max_string (f0 INT, f1 STRING)");
+        sql("INSERT INTO min_max_string VALUES (1, 'a'), (2, 'b')");
+
+        String sql = "SELECT MIN(f1) FROM min_max_string";
+        assertThat(sql(sql)).containsOnly(Row.of("a"));
+        validateAggregateNotPushDown(sql, "MinAggFunction");
+    }
+
+    @Test
+    public void testMinMaxPKNotPushDown() {
+        sql("CREATE TABLE min_max_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 
INT)");
+        sql("INSERT INTO min_max_pk VALUES (1, 3), (2, 5)");
+        sql("INSERT INTO min_max_pk VALUES (1, 1)");
+
+        String sql = "SELECT MIN(f1), MAX(f1) FROM min_max_pk";
+        assertThat(sql(sql)).containsOnly(Row.of(1, 5));
+        validateAggregateNotPushDown(sql, "MinAggFunction", "MaxAggFunction");
+    }
+
     @Test
     public void testCountStarAppendWithDv() {
         sql(
@@ -811,19 +983,31 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     private void validateCount1PushDown(String sql) {
+        validateAggregatePushDown(sql, "Count1AggFunction");
+    }
+
+    private void validateAggregatePushDown(String sql, String... 
functionNames) {
         Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
         while (!transformation.getInputs().isEmpty()) {
             transformation = transformation.getInputs().get(0);
         }
-        
assertThat(transformation.getDescription()).contains("Count1AggFunction");
+        for (String functionName : functionNames) {
+            assertThat(transformation.getDescription()).contains(functionName);
+        }
     }
 
     private void validateCount1NotPushDown(String sql) {
+        validateAggregateNotPushDown(sql, "Count1AggFunction");
+    }
+
+    private void validateAggregateNotPushDown(String sql, String... 
functionNames) {
         Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
         while (!transformation.getInputs().isEmpty()) {
             transformation = transformation.getInputs().get(0);
         }
-        
assertThat(transformation.getDescription()).doesNotContain("Count1AggFunction");
+        for (String functionName : functionNames) {
+            
assertThat(transformation.getDescription()).doesNotContain(functionName);
+        }
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticRowDataSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticRowDataSourceTest.java
new file mode 100644
index 0000000000..b1cc1a19ef
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/StaticRowDataSourceTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link StaticRowDataSource}. */
+class StaticRowDataSourceTest {
+
+    private static final RowType ROW_TYPE = RowType.of(DataTypes.INT());
+
+    @Test
+    void testReaderCheckpoints() throws Exception {
+        List<InternalRow> rows = rows(0, 100);
+        TestingReaderOutput<RowData> out = new TestingReaderOutput<>();
+
+        SourceReader<RowData, StaticRowDataSource.StaticRowsSplit> reader = 
createReader();
+        reader.addSplits(
+                Arrays.asList(
+                        new StaticRowDataSource.StaticRowsSplit("split-1", 
rows.subList(0, 35), 0),
+                        new StaticRowDataSource.StaticRowsSplit(
+                                "split-2", rows.subList(35, rows.size()), 0)));
+
+        int remainingInCycle = 17;
+        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+            if (--remainingInCycle <= 0) {
+                remainingInCycle = 17;
+
+                List<StaticRowDataSource.StaticRowsSplit> splits = 
reader.snapshotState(1L);
+
+                reader = createReader();
+                if (splits.isEmpty()) {
+                    reader.notifyNoMoreSplits();
+                } else {
+                    reader.addSplits(splits);
+                }
+            }
+        }
+
+        
assertThat(values(out.getEmittedRecords())).containsExactlyElementsOf(values(0, 
100));
+    }
+
+    @Test
+    void testSplitSerializer() throws Exception {
+        StaticRowDataSource source = new 
StaticRowDataSource(Collections.emptyList(), ROW_TYPE);
+        SimpleVersionedSerializer<StaticRowDataSource.StaticRowsSplit> 
serializer =
+                source.getSplitSerializer();
+
+        StaticRowDataSource.StaticRowsSplit split =
+                new StaticRowDataSource.StaticRowsSplit("split-1", rows(0, 
10), 3);
+        StaticRowDataSource.StaticRowsSplit restored =
+                serializer.deserialize(serializer.getVersion(), 
serializer.serialize(split));
+
+        assertThat(read(restored)).containsExactlyElementsOf(values(3, 10));
+    }
+
+    @Test
+    void testEnumeratorCheckpointSerializer() throws Exception {
+        StaticRowDataSource source = new 
StaticRowDataSource(Collections.emptyList(), ROW_TYPE);
+        
SimpleVersionedSerializer<Collection<StaticRowDataSource.StaticRowsSplit>> 
serializer =
+                source.getEnumeratorCheckpointSerializer();
+
+        Collection<StaticRowDataSource.StaticRowsSplit> checkpoint =
+                Arrays.asList(
+                        new StaticRowDataSource.StaticRowsSplit("split-1", 
rows(0, 5), 2),
+                        new StaticRowDataSource.StaticRowsSplit("split-2", 
rows(5, 9), 1));
+        Collection<StaticRowDataSource.StaticRowsSplit> restored =
+                serializer.deserialize(serializer.getVersion(), 
serializer.serialize(checkpoint));
+
+        TestingReaderOutput<RowData> out = new TestingReaderOutput<>();
+        SourceReader<RowData, StaticRowDataSource.StaticRowsSplit> reader = 
createReader();
+        reader.addSplits(new ArrayList<>(restored));
+        read(reader, out, 6);
+
+        assertThat(values(out.getEmittedRecords())).containsExactly(2, 3, 4, 
6, 7, 8);
+    }
+
+    private static List<Integer> read(StaticRowDataSource.StaticRowsSplit 
split) throws Exception {
+        TestingReaderOutput<RowData> out = new TestingReaderOutput<>();
+        SourceReader<RowData, StaticRowDataSource.StaticRowsSplit> reader = 
createReader();
+        reader.addSplits(Collections.singletonList(split));
+        read(reader, out, 7);
+        return values(out.getEmittedRecords());
+    }
+
+    private static void read(
+            SourceReader<RowData, StaticRowDataSource.StaticRowsSplit> reader,
+            TestingReaderOutput<RowData> out,
+            int expectedSize)
+            throws Exception {
+        while (out.getEmittedRecords().size() < expectedSize) {
+            
assertThat(reader.pollNext(out)).isNotEqualTo(InputStatus.END_OF_INPUT);
+        }
+    }
+
+    private static List<InternalRow> rows(int from, int to) {
+        List<InternalRow> rows = new ArrayList<>(to - from);
+        for (int i = from; i < to; i++) {
+            rows.add(GenericRow.of(i));
+        }
+        return rows;
+    }
+
+    private static List<Integer> values(int from, int to) {
+        List<Integer> values = new ArrayList<>(to - from);
+        for (int i = from; i < to; i++) {
+            values.add(i);
+        }
+        return values;
+    }
+
+    private static List<Integer> values(List<RowData> rows) {
+        List<Integer> values = new ArrayList<>(rows.size());
+        for (RowData row : rows) {
+            values.add(row.getInt(0));
+        }
+        return values;
+    }
+
+    private static SourceReader<RowData, StaticRowDataSource.StaticRowsSplit> 
createReader() {
+        return new StaticRowDataSource(Collections.emptyList(), ROW_TYPE)
+                .createReader(new DummyReaderContext());
+    }
+
+    private static final class DummyReaderContext implements 
SourceReaderContext {
+
+        @Override
+        public SourceReaderMetricGroup metricGroup() {
+            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+        }
+
+        @Override
+        public Configuration getConfiguration() {
+            return new Configuration();
+        }
+
+        @Override
+        public String getLocalHostName() {
+            return "localhost";
+        }
+
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
+        @Override
+        public void sendSplitRequest() {}
+
+        @Override
+        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+        }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
+    }
+
+    private static final class TestingReaderOutput<E> implements 
ReaderOutput<E> {
+
+        private final ArrayList<E> emittedRecords = new ArrayList<>();
+
+        @Override
+        public void collect(E record) {
+            emittedRecords.add(record);
+        }
+
+        @Override
+        public void collect(E record, long timestamp) {
+            collect(record);
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void markIdle() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void markActive() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public SourceOutput<E> createOutputForSplit(String splitId) {
+            return this;
+        }
+
+        @Override
+        public void releaseOutputForSplit(String splitId) {}
+
+        private ArrayList<E> getEmittedRecords() {
+            return emittedRecords;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 05435c35f8..c5d057abb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -149,7 +149,6 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        null,
                         null);
         
Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L);
         Map<String, ColStats<?>> colStatsMap = new HashMap<>();
@@ -226,7 +225,6 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        null,
                         null);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L);
         Map<String, ColStats<?>> colStatsMap = new HashMap<>();
@@ -303,7 +301,6 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        null,
                         null);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L);
         Map<String, ColStats<?>> colStatsMap = new HashMap<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index cac7a718a9..f8ec891b3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -50,7 +50,6 @@ public class PrimaryKeyTableStatisticsTest extends 
FileStoreTableStatisticsTestB
                         null,
                         null,
                         null,
-                        null,
                         null);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L);
         // TODO validate column statistics

Reply via email to