This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ce11811 [flink] Support count star push down to source for append 
table (#4236)
d3ce11811 is described below

commit d3ce11811c67e5cbf70afee16884efe3fc2820df
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 24 17:44:35 2024 +0800

    [flink] Support count star push down to source for append table (#4236)
---
 .../org/apache/paimon/manifest/PartitionEntry.java |  37 +-
 .../paimon/table/DelegatedFileStoreTable.java      |   4 +
 .../paimon/table/FallbackReadFileStoreTable.java   |   8 -
 .../paimon/table/source/AbstractDataTableScan.java |   6 -
 .../paimon/table/source/DataTableBatchScan.java    |   9 +
 .../paimon/table/source/DataTableStreamScan.java   |   9 +
 .../source/snapshot/AbstractStartingScanner.java   |  14 +
 .../source/snapshot/CompactedStartingScanner.java  |   9 +-
 .../ContinuousFromSnapshotFullStartingScanner.java |   9 +-
 .../snapshot/FileCreationTimeStartingScanner.java  |  18 +-
 .../snapshot/FullCompactedStartingScanner.java     |   9 +-
 .../table/source/snapshot/FullStartingScanner.java |   9 +-
 ...ngScanner.java => ReadPlanStartingScanner.java} |  42 +-
 .../table/source/snapshot/StartingScanner.java     |   3 +
 .../StaticFromSnapshotStartingScanner.java         |   9 +-
 .../snapshot/StaticFromTagStartingScanner.java     |   7 +-
 .../StaticFromTimestampStartingScanner.java        |   9 +-
 .../StaticFromWatermarkStartingScanner.java        |   7 +-
 .../apache/paimon/table/system/AuditLogTable.java  |  60 +--
 .../paimon/flink/source/DataTableSource.java       |  12 +-
 .../flink/lookup/DynamicPartitionLoader.java       |   8 +-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |   4 +-
 .../paimon/flink/lookup/LookupDataTableScan.java   |   4 +-
 .../paimon/flink/lookup/LookupFileStoreTable.java  |  26 --
 .../paimon/flink/lookup/LookupStreamingReader.java |   5 +-
 .../paimon/flink/source/BaseDataTableSource.java   |  94 ++++-
 .../paimon/flink/source/DataTableSource.java       |  12 +-
 .../paimon/flink/source/FlinkTableSource.java      |  38 +-
 .../flink/source/NumberSequenceRowSource.java      | 436 +++++++++++++++++++++
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  59 +++
 .../paimon/flink/source/IteratorSourcesITCase.java |  98 +++++
 .../flink/source/NumberSequenceRowSourceTest.java  | 195 +++++++++
 .../FileStoreTableStatisticsTestBase.java          |   9 +-
 .../statistics/PrimaryKeyTableStatisticsTest.java  |   3 +-
 .../apache/paimon/flink/util/AbstractTestBase.java |  44 +++
 35 files changed, 1146 insertions(+), 179 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
index 22b7fc2fe..1aa562444 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/PartitionEntry.java
@@ -20,12 +20,15 @@ package org.apache.paimon.manifest;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.paimon.manifest.FileKind.ADD;
 import static org.apache.paimon.manifest.FileKind.DELETE;
 
 /** Entry representing a partition. */
@@ -81,20 +84,21 @@ public class PartitionEntry {
     }
 
     public static PartitionEntry fromManifestEntry(ManifestEntry entry) {
-        long recordCount = entry.file().rowCount();
-        long fileSizeInBytes = entry.file().fileSize();
+        return fromDataFile(entry.partition(), entry.kind(), entry.file());
+    }
+
+    public static PartitionEntry fromDataFile(
+            BinaryRow partition, FileKind kind, DataFileMeta file) {
+        long recordCount = file.rowCount();
+        long fileSizeInBytes = file.fileSize();
         long fileCount = 1;
-        if (entry.kind() == DELETE) {
+        if (kind == DELETE) {
             recordCount = -recordCount;
             fileSizeInBytes = -fileSizeInBytes;
             fileCount = -fileCount;
         }
         return new PartitionEntry(
-                entry.partition(),
-                recordCount,
-                fileSizeInBytes,
-                fileCount,
-                entry.file().creationTimeEpochMillis());
+                partition, recordCount, fileSizeInBytes, fileCount, 
file.creationTimeEpochMillis());
     }
 
     public static Collection<PartitionEntry> merge(Collection<ManifestEntry> 
fileEntries) {
@@ -108,6 +112,23 @@ public class PartitionEntry {
         return partitions.values();
     }
 
+    public static Collection<PartitionEntry> mergeSplits(Collection<DataSplit> 
splits) {
+        Map<BinaryRow, PartitionEntry> partitions = new HashMap<>();
+        for (DataSplit split : splits) {
+            BinaryRow partition = split.partition();
+            for (DataFileMeta file : split.dataFiles()) {
+                PartitionEntry partitionEntry = fromDataFile(partition, ADD, 
file);
+                partitions.compute(
+                        partition,
+                        (part, old) -> old == null ? partitionEntry : 
old.merge(partitionEntry));
+            }
+
+            // Ignore before files, because we don't know how to merge them
+            // Ignore deletion files, because it is costly to read from it
+        }
+        return partitions.values();
+    }
+
     public static void merge(Collection<PartitionEntry> from, Map<BinaryRow, 
PartitionEntry> to) {
         for (PartitionEntry entry : from) {
             to.compute(entry.partition(), (part, old) -> old == null ? entry : 
old.merge(entry));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index a60e17496..dc758a1af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -58,6 +58,10 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         this.wrapped = wrapped;
     }
 
+    public FileStoreTable wrapped() {
+        return wrapped;
+    }
+
     @Override
     public String name() {
         return wrapped.name();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f8be79cb4..7e0d4ab98 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -46,7 +46,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -304,13 +303,6 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             return new DataFilePlan(splits);
         }
 
-        @Override
-        public List<BinaryRow> listPartitions() {
-            Set<BinaryRow> partitions = new 
LinkedHashSet<>(mainScan.listPartitions());
-            partitions.addAll(fallbackScan.listPartitions());
-            return new ArrayList<>(partitions);
-        }
-
         @Override
         public List<PartitionEntry> listPartitionEntries() {
             List<PartitionEntry> partitionEntries = 
mainScan.listPartitionEntries();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index ba1bc6588..6a8aa9265 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
@@ -240,9 +239,4 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
                         "Unknown startup mode " + startupMode.name());
         }
     }
-
-    @Override
-    public List<PartitionEntry> listPartitionEntries() {
-        return snapshotReader.partitionEntries();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 8cf842aee..93ab5ba16 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -81,6 +82,14 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         }
     }
 
+    @Override
+    public List<PartitionEntry> listPartitionEntries() {
+        if (startingScanner == null) {
+            startingScanner = createStartingScanner(false);
+        }
+        return startingScanner.scanPartitions(snapshotReader);
+    }
+
     private StartingScanner.Result applyPushDownLimit(StartingScanner.Result 
result) {
         if (pushDownLimit != null && result instanceof ScannedResult) {
             long scannedRowCount = 0;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 4cd221996..a68c7b1cb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.lookup.LookupStrategy;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
@@ -44,6 +45,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
 
 /** {@link StreamTableScan} implementation for streaming planning. */
@@ -110,6 +113,12 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
         }
     }
 
+    @Override
+    public List<PartitionEntry> listPartitionEntries() {
+        throw new UnsupportedOperationException(
+                "List Partition Entries is not supported in Stream Scan.");
+    }
+
     private void initScanner() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(true);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
index d711781d7..c1b2f985a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/AbstractStartingScanner.java
@@ -18,9 +18,14 @@
 
 package org.apache.paimon.table.source.snapshot;
 
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 /** The abstract class for StartingScanner. */
 public abstract class AbstractStartingScanner implements StartingScanner {
 
@@ -44,4 +49,13 @@ public abstract class AbstractStartingScanner implements 
StartingScanner {
             return new StartingContext(startingSnapshotId, startingScanMode() 
== ScanMode.ALL);
         }
     }
+
+    @Override
+    public List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader) {
+        Result result = scan(snapshotReader);
+        if (result instanceof ScannedResult) {
+            return new ArrayList<>(PartitionEntry.mergeSplits(((ScannedResult) 
result).splits()));
+        }
+        return Collections.emptyList();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
index a15954627..353321a87 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/CompactedStartingScanner.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 /** {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#COMPACTED_FULL} startup mode. */
-public class CompactedStartingScanner extends AbstractStartingScanner {
+public class CompactedStartingScanner extends ReadPlanStartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompactedStartingScanner.class);
 
@@ -44,13 +44,13 @@ public class CompactedStartingScanner extends 
AbstractStartingScanner {
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         Long startingSnapshotId = pick();
         if (startingSnapshotId == null) {
             startingSnapshotId = snapshotManager.latestSnapshotId();
             if (startingSnapshotId == null) {
                 LOG.debug("There is currently no snapshot. Wait for the 
snapshot generation.");
-                return new NoSnapshot();
+                return null;
             } else {
                 LOG.debug(
                         "No compact snapshot found, reading from the latest 
snapshot {}.",
@@ -58,8 +58,7 @@ public class CompactedStartingScanner extends 
AbstractStartingScanner {
             }
         }
 
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
index 3ffd421d0..d7b4b1eeb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
@@ -26,7 +26,7 @@ import org.apache.paimon.utils.SnapshotManager;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode
  * of a batch read.
  */
-public class ContinuousFromSnapshotFullStartingScanner extends 
AbstractStartingScanner {
+public class ContinuousFromSnapshotFullStartingScanner extends 
ReadPlanStartingScanner {
 
     public ContinuousFromSnapshotFullStartingScanner(
             SnapshotManager snapshotManager, long snapshotId) {
@@ -40,13 +40,12 @@ public class ContinuousFromSnapshotFullStartingScanner 
extends AbstractStartingS
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         if (earliestSnapshotId == null) {
-            return new NoSnapshot();
+            return null;
         }
         long ceiledSnapshotId = Math.max(startingSnapshotId, 
earliestSnapshotId);
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
index 0ff55152a..ad557b22a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FileCreationTimeStartingScanner.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_FILE_CREATION_TIME} startup
  * mode.
  */
-public class FileCreationTimeStartingScanner extends AbstractStartingScanner {
+public class FileCreationTimeStartingScanner extends ReadPlanStartingScanner {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(FileCreationTimeStartingScanner.class);
@@ -47,18 +47,16 @@ public class FileCreationTimeStartingScanner extends 
AbstractStartingScanner {
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         Long startingSnapshotId = snapshotManager.latestSnapshotId();
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return new NoSnapshot();
+            return null;
         }
-        return StartingScanner.fromPlan(
-                snapshotReader
-                        .withMode(ScanMode.ALL)
-                        .withSnapshot(startingSnapshotId)
-                        .withManifestEntryFilter(
-                                entry -> 
entry.file().creationTimeEpochMillis() >= startupMillis)
-                        .read());
+        return snapshotReader
+                .withMode(ScanMode.ALL)
+                .withSnapshot(startingSnapshotId)
+                .withManifestEntryFilter(
+                        entry -> entry.file().creationTimeEpochMillis() >= 
startupMillis);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
index 5015d56c7..7fa334d8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullCompactedStartingScanner.java
@@ -33,7 +33,7 @@ import javax.annotation.Nullable;
  * {@link StartingScanner} for the {@link StartupMode#COMPACTED_FULL} startup 
mode with
  * 'full-compaction.delta-commits'.
  */
-public class FullCompactedStartingScanner extends AbstractStartingScanner {
+public class FullCompactedStartingScanner extends ReadPlanStartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FullCompactedStartingScanner.class);
 
@@ -62,13 +62,13 @@ public class FullCompactedStartingScanner extends 
AbstractStartingScanner {
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         Long startingSnapshotId = pick();
         if (startingSnapshotId == null) {
             startingSnapshotId = snapshotManager.latestSnapshotId();
             if (startingSnapshotId == null) {
                 LOG.debug("There is currently no snapshot. Wait for the 
snapshot generation.");
-                return new NoSnapshot();
+                return null;
             } else {
                 LOG.debug(
                         "No compact snapshot found, reading from the latest 
snapshot {}.",
@@ -76,8 +76,7 @@ public class FullCompactedStartingScanner extends 
AbstractStartingScanner {
             }
         }
 
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 
     public static boolean isFullCompactedIdentifier(long identifier, int 
deltaCommits) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
index fc9b49d2d..1189bc68e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#LATEST_FULL} startup mode. */
-public class FullStartingScanner extends AbstractStartingScanner {
+public class FullStartingScanner extends ReadPlanStartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FullStartingScanner.class);
 
@@ -41,16 +41,15 @@ public class FullStartingScanner extends 
AbstractStartingScanner {
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         if (startingSnapshotId == null) {
             // try to get first snapshot again
             startingSnapshotId = snapshotManager.latestSnapshotId();
         }
         if (startingSnapshotId == null) {
             LOG.debug("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return new NoSnapshot();
+            return null;
         }
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ReadPlanStartingScanner.java
similarity index 55%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ReadPlanStartingScanner.java
index 3ffd421d0..56b7680eb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotFullStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ReadPlanStartingScanner.java
@@ -18,35 +18,39 @@
 
 package org.apache.paimon.table.source.snapshot;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.utils.SnapshotManager;
 
-/**
- * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode
- * of a batch read.
- */
-public class ContinuousFromSnapshotFullStartingScanner extends 
AbstractStartingScanner {
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+/** An {@link AbstractStartingScanner} to return plan. */
+public abstract class ReadPlanStartingScanner extends AbstractStartingScanner {
 
-    public ContinuousFromSnapshotFullStartingScanner(
-            SnapshotManager snapshotManager, long snapshotId) {
+    ReadPlanStartingScanner(SnapshotManager snapshotManager) {
         super(snapshotManager);
-        this.startingSnapshotId = snapshotId;
     }
 
-    @Override
-    public ScanMode startingScanMode() {
-        return ScanMode.ALL;
-    }
+    @Nullable
+    protected abstract SnapshotReader configure(SnapshotReader snapshotReader);
 
     @Override
     public Result scan(SnapshotReader snapshotReader) {
-        Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
-        if (earliestSnapshotId == null) {
+        SnapshotReader configured = configure(snapshotReader);
+        if (configured == null) {
             return new NoSnapshot();
         }
-        long ceiledSnapshotId = Math.max(startingSnapshotId, 
earliestSnapshotId);
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(ceiledSnapshotId).read());
+        return StartingScanner.fromPlan(configured.read());
+    }
+
+    @Override
+    public List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader) {
+        SnapshotReader configured = configure(snapshotReader);
+        if (configured == null) {
+            return Collections.emptyList();
+        }
+        return configured.partitionEntries();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index 98dcaf669..6d9daa70a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source.snapshot;
 
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.TableScan;
 
@@ -32,6 +33,8 @@ public interface StartingScanner {
 
     Result scan(SnapshotReader snapshotReader);
 
+    List<PartitionEntry> scanPartitions(SnapshotReader snapshotReader);
+
     /** Scan result of {@link #scan}. */
     interface Result {}
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
index ca0f0a91a..19b9da244 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromSnapshotStartingScanner.java
@@ -31,7 +31,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_SNAPSHOT} or {@link
  * CoreOptions.StartupMode#FROM_SNAPSHOT_FULL} startup mode of a batch read.
  */
-public class StaticFromSnapshotStartingScanner extends AbstractStartingScanner 
{
+public class StaticFromSnapshotStartingScanner extends ReadPlanStartingScanner 
{
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StaticFromSnapshotStartingScanner.class);
@@ -47,13 +47,13 @@ public class StaticFromSnapshotStartingScanner extends 
AbstractStartingScanner {
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
 
         if (earliestSnapshotId == null || latestSnapshotId == null) {
             LOG.warn("There is currently no snapshot. Waiting for snapshot 
generation.");
-            return new NoSnapshot();
+            return null;
         }
 
         // Checks earlier whether the specified scan snapshot id is valid and 
throws the correct
@@ -65,7 +65,6 @@ public class StaticFromSnapshotStartingScanner extends 
AbstractStartingScanner {
                 earliestSnapshotId,
                 latestSnapshotId);
 
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
index 3850f41a8..4fa070299 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java
@@ -25,7 +25,7 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 /** {@link StartingScanner} for the {@link CoreOptions#SCAN_TAG_NAME} of a 
batch read. */
-public class StaticFromTagStartingScanner extends AbstractStartingScanner {
+public class StaticFromTagStartingScanner extends ReadPlanStartingScanner {
 
     private final String tagName;
 
@@ -40,11 +40,10 @@ public class StaticFromTagStartingScanner extends 
AbstractStartingScanner {
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         TagManager tagManager =
                 new TagManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         Snapshot snapshot = tagManager.taggedSnapshot(tagName);
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot).read());
+        return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
index 1c039c0c7..a5087890a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTimestampStartingScanner.java
@@ -32,7 +32,7 @@ import javax.annotation.Nullable;
  * {@link StartingScanner} for the {@link 
CoreOptions.StartupMode#FROM_TIMESTAMP} startup mode of a
  * batch read.
  */
-public class StaticFromTimestampStartingScanner extends 
AbstractStartingScanner {
+public class StaticFromTimestampStartingScanner extends 
ReadPlanStartingScanner {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StaticFromTimestampStartingScanner.class);
@@ -49,15 +49,14 @@ public class StaticFromTimestampStartingScanner extends 
AbstractStartingScanner
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         if (startingSnapshotId == null) {
             LOG.debug(
                     "There is currently no snapshot earlier than or equal to 
timestamp[{}]",
                     startupMillis);
-            return new NoSnapshot();
+            return null;
         }
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
index 022fdf37f..72fba5533 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromWatermarkStartingScanner.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 /** {@link StartingScanner} for the {@link CoreOptions#SCAN_WATERMARK} of a 
batch read. */
-public class StaticFromWatermarkStartingScanner extends 
AbstractStartingScanner {
+public class StaticFromWatermarkStartingScanner extends 
ReadPlanStartingScanner {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(StaticFromWatermarkStartingScanner.class);
@@ -51,7 +51,7 @@ public class StaticFromWatermarkStartingScanner extends 
AbstractStartingScanner
     }
 
     @Override
-    public Result scan(SnapshotReader snapshotReader) {
+    public SnapshotReader configure(SnapshotReader snapshotReader) {
         if (startingSnapshotId == null) {
             LOG.warn(
                     "There is currently no snapshot later than or equal to 
watermark[{}]",
@@ -62,8 +62,7 @@ public class StaticFromWatermarkStartingScanner extends 
AbstractStartingScanner
                                     + "watermark[%d]",
                             watermark));
         }
-        return StartingScanner.fromPlan(
-                
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId).read());
+        return 
snapshotReader.withMode(ScanMode.ALL).withSnapshot(startingSnapshotId);
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index d9cf80289..84c5fbd42 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -237,155 +237,155 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
 
     private class AuditLogDataReader implements SnapshotReader {
 
-        private final SnapshotReader snapshotReader;
+        private final SnapshotReader wrapped;
 
-        private AuditLogDataReader(SnapshotReader snapshotReader) {
-            this.snapshotReader = snapshotReader;
+        private AuditLogDataReader(SnapshotReader wrapped) {
+            this.wrapped = wrapped;
         }
 
         @Override
         public Integer parallelism() {
-            return snapshotReader.parallelism();
+            return wrapped.parallelism();
         }
 
         @Override
         public SnapshotManager snapshotManager() {
-            return snapshotReader.snapshotManager();
+            return wrapped.snapshotManager();
         }
 
         @Override
         public ManifestsReader manifestsReader() {
-            return snapshotReader.manifestsReader();
+            return wrapped.manifestsReader();
         }
 
         @Override
         public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
-            return snapshotReader.readManifest(manifest);
+            return wrapped.readManifest(manifest);
         }
 
         @Override
         public ConsumerManager consumerManager() {
-            return snapshotReader.consumerManager();
+            return wrapped.consumerManager();
         }
 
         @Override
         public SplitGenerator splitGenerator() {
-            return snapshotReader.splitGenerator();
+            return wrapped.splitGenerator();
         }
 
         @Override
         public FileStorePathFactory pathFactory() {
-            return snapshotReader.pathFactory();
+            return wrapped.pathFactory();
         }
 
         public SnapshotReader withSnapshot(long snapshotId) {
-            snapshotReader.withSnapshot(snapshotId);
+            wrapped.withSnapshot(snapshotId);
             return this;
         }
 
         public SnapshotReader withSnapshot(Snapshot snapshot) {
-            snapshotReader.withSnapshot(snapshot);
+            wrapped.withSnapshot(snapshot);
             return this;
         }
 
         public SnapshotReader withFilter(Predicate predicate) {
-            convert(predicate).ifPresent(snapshotReader::withFilter);
+            convert(predicate).ifPresent(wrapped::withFilter);
             return this;
         }
 
         @Override
         public SnapshotReader withPartitionFilter(Map<String, String> 
partitionSpec) {
-            snapshotReader.withPartitionFilter(partitionSpec);
+            wrapped.withPartitionFilter(partitionSpec);
             return this;
         }
 
         @Override
         public SnapshotReader withPartitionFilter(Predicate predicate) {
-            snapshotReader.withPartitionFilter(predicate);
+            wrapped.withPartitionFilter(predicate);
             return this;
         }
 
         @Override
         public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
-            snapshotReader.withPartitionFilter(partitions);
+            wrapped.withPartitionFilter(partitions);
             return this;
         }
 
         @Override
         public SnapshotReader withMode(ScanMode scanMode) {
-            snapshotReader.withMode(scanMode);
+            wrapped.withMode(scanMode);
             return this;
         }
 
         @Override
         public SnapshotReader withLevelFilter(Filter<Integer> levelFilter) {
-            snapshotReader.withLevelFilter(levelFilter);
+            wrapped.withLevelFilter(levelFilter);
             return this;
         }
 
         @Override
         public SnapshotReader withManifestEntryFilter(Filter<ManifestEntry> 
filter) {
-            snapshotReader.withManifestEntryFilter(filter);
+            wrapped.withManifestEntryFilter(filter);
             return this;
         }
 
         public SnapshotReader withBucket(int bucket) {
-            snapshotReader.withBucket(bucket);
+            wrapped.withBucket(bucket);
             return this;
         }
 
         @Override
         public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
-            snapshotReader.withBucketFilter(bucketFilter);
+            wrapped.withBucketFilter(bucketFilter);
             return this;
         }
 
         @Override
         public SnapshotReader withDataFileNameFilter(Filter<String> 
fileNameFilter) {
-            snapshotReader.withDataFileNameFilter(fileNameFilter);
+            wrapped.withDataFileNameFilter(fileNameFilter);
             return this;
         }
 
         @Override
         public SnapshotReader withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
-            snapshotReader.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
+            wrapped.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
             return this;
         }
 
         @Override
         public SnapshotReader withMetricRegistry(MetricRegistry registry) {
-            snapshotReader.withMetricRegistry(registry);
+            wrapped.withMetricRegistry(registry);
             return this;
         }
 
         @Override
         public Plan read() {
-            return snapshotReader.read();
+            return wrapped.read();
         }
 
         @Override
         public Plan readChanges() {
-            return snapshotReader.readChanges();
+            return wrapped.readChanges();
         }
 
         @Override
         public Plan readIncrementalDiff(Snapshot before) {
-            return snapshotReader.readIncrementalDiff(before);
+            return wrapped.readIncrementalDiff(before);
         }
 
         @Override
         public List<BinaryRow> partitions() {
-            return snapshotReader.partitions();
+            return wrapped.partitions();
         }
 
         @Override
         public List<PartitionEntry> partitionEntries() {
-            return snapshotReader.partitionEntries();
+            return wrapped.partitionEntries();
         }
 
         @Override
         public List<BucketEntry> bucketEntries() {
-            return snapshotReader.bucketEntries();
+            return wrapped.bucketEntries();
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index f870d8370..ee00d4183 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -50,7 +50,8 @@ public class DataTableSource extends BaseDataTableSource {
                 null,
                 null,
                 null,
-                null);
+                null,
+                false);
     }
 
     public DataTableSource(
@@ -62,7 +63,8 @@ public class DataTableSource extends BaseDataTableSource {
             @Nullable Predicate predicate,
             @Nullable int[][] projectFields,
             @Nullable Long limit,
-            @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
+            @Nullable WatermarkStrategy<RowData> watermarkStrategy,
+            boolean isBatchCountStar) {
         super(
                 tableIdentifier,
                 table,
@@ -72,7 +74,8 @@ public class DataTableSource extends BaseDataTableSource {
                 predicate,
                 projectFields,
                 limit,
-                watermarkStrategy);
+                watermarkStrategy,
+                isBatchCountStar);
     }
 
     @Override
@@ -86,7 +89,8 @@ public class DataTableSource extends BaseDataTableSource {
                 predicate,
                 projectFields,
                 limit,
-                watermarkStrategy);
+                watermarkStrategy,
+                isBatchCountStar);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
index a8660ee8c..37a504c58 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
@@ -49,7 +48,6 @@ public class DynamicPartitionLoader implements Serializable {
     private final Table table;
     private final Duration refreshInterval;
 
-    private TableScan scan;
     private Comparator<InternalRow> comparator;
 
     private LocalDateTime lastRefresh;
@@ -61,7 +59,6 @@ public class DynamicPartitionLoader implements Serializable {
     }
 
     public void open() {
-        this.scan = table.newReadBuilder().newScan();
         RowType partitionType = table.rowType().project(table.partitionKeys());
         this.comparator = 
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
     }
@@ -87,7 +84,10 @@ public class DynamicPartitionLoader implements Serializable {
         }
 
         BinaryRow previous = this.partition;
-        partition = 
scan.listPartitions().stream().max(comparator).orElse(null);
+        partition =
+                table.newReadBuilder().newScan().listPartitions().stream()
+                        .max(comparator)
+                        .orElse(null);
         lastRefresh = LocalDateTime.now();
 
         return !Objects.equals(previous, partition);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 15b82fbe4..e9389f1f2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -334,7 +334,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     /** Context for {@link LookupTable}. */
     public static class Context {
 
-        public final FileStoreTable table;
+        public final LookupFileStoreTable table;
         public final int[] projection;
         @Nullable public final Predicate tablePredicate;
         @Nullable public final Predicate projectedPredicate;
@@ -361,7 +361,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
 
         public Context copy(int[] newProjection) {
             return new Context(
-                    table,
+                    table.wrapped(),
                     newProjection,
                     tablePredicate,
                     projectedPredicate,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
index fe064b80e..908884a57 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupDataTableScan.java
@@ -38,8 +38,8 @@ import static 
org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamSc
  */
 public class LookupDataTableScan extends DataTableStreamScan {
 
-    private StartupMode startupMode;
-    private LookupStreamScanMode lookupScanMode;
+    private final StartupMode startupMode;
+    private final LookupStreamScanMode lookupScanMode;
 
     public LookupDataTableScan(
             CoreOptions options,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
index bb5274bc6..090399706 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupFileStoreTable.java
@@ -22,9 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.utils.TableScanUtils;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.options.description.DescribedEnum;
@@ -33,10 +30,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.DelegatedFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.InnerTableRead;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.ReadBuilderImpl;
 import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.utils.SimpleFileReader;
 
 import java.util.HashSet;
 import java.util.List;
@@ -63,11 +57,6 @@ public class LookupFileStoreTable extends 
DelegatedFileStoreTable {
         this.lookupScanMode = lookupScanMode;
     }
 
-    @Override
-    public ReadBuilder newReadBuilder() {
-        return new ReadBuilderImpl(this);
-    }
-
     @Override
     public InnerTableRead newRead() {
         switch (lookupScanMode) {
@@ -94,21 +83,6 @@ public class LookupFileStoreTable extends 
DelegatedFileStoreTable {
                 lookupScanMode);
     }
 
-    @Override
-    public SimpleFileReader<ManifestFileMeta> manifestListReader() {
-        return wrapped.manifestListReader();
-    }
-
-    @Override
-    public SimpleFileReader<ManifestEntry> manifestFileReader() {
-        return wrapped.manifestFileReader();
-    }
-
-    @Override
-    public SimpleFileReader<IndexManifestEntry> indexManifestFileReader() {
-        return wrapped.indexManifestFileReader();
-    }
-
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         return new LookupFileStoreTable(wrapped.copy(dynamicOptions), 
lookupScanMode);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
index fa9b7672d..e6dfd41f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java
@@ -26,7 +26,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -50,14 +49,14 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping
 /** A streaming reader to load data into {@link LookupTable}. */
 public class LookupStreamingReader {
 
-    private final Table table;
+    private final LookupFileStoreTable table;
     private final int[] projection;
     private final ReadBuilder readBuilder;
     @Nullable private final Predicate projectedPredicate;
     private final StreamTableScan scan;
 
     public LookupStreamingReader(
-            Table table,
+            LookupFileStoreTable table,
             int[] projection,
             @Nullable Predicate predicate,
             Set<Integer> requireCachedBucketIds) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 8775ab8f5..9458f7817 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,12 +28,16 @@ import org.apache.paimon.flink.log.LogSourceProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
 import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.LookupTableSource;
@@ -41,19 +45,25 @@ import 
org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
 import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
 import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
 import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.AggregateExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.DataType;
 
 import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE;
@@ -68,7 +78,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_
  * batch mode or streaming mode.
  */
 public abstract class BaseDataTableSource extends FlinkTableSource
-        implements LookupTableSource, SupportsWatermarkPushDown {
+        implements LookupTableSource, SupportsWatermarkPushDown, 
SupportsAggregatePushDown {
 
     protected final ObjectIdentifier tableIdentifier;
     protected final boolean streaming;
@@ -76,6 +86,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
     @Nullable protected final LogStoreTableFactory logStoreTableFactory;
 
     @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
+    protected boolean isBatchCountStar;
 
     public BaseDataTableSource(
             ObjectIdentifier tableIdentifier,
@@ -86,7 +97,8 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
             @Nullable Predicate predicate,
             @Nullable int[][] projectFields,
             @Nullable Long limit,
-            @Nullable WatermarkStrategy<RowData> watermarkStrategy) {
+            @Nullable WatermarkStrategy<RowData> watermarkStrategy,
+            boolean isBatchCountStar) {
         super(table, predicate, projectFields, limit);
         this.tableIdentifier = tableIdentifier;
         this.streaming = streaming;
@@ -96,6 +108,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
         this.projectFields = projectFields;
         this.limit = limit;
         this.watermarkStrategy = watermarkStrategy;
+        this.isBatchCountStar = isBatchCountStar;
     }
 
     @Override
@@ -110,7 +123,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
         } else {
             Options options = Options.fromMap(table.options());
 
-            if (new CoreOptions(options).mergeEngine() == 
CoreOptions.MergeEngine.FIRST_ROW) {
+            if (new CoreOptions(options).mergeEngine() == FIRST_ROW) {
                 return ChangelogMode.insertOnly();
             }
 
@@ -134,6 +147,10 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        if (isBatchCountStar) {
+            return createCountStarScan();
+        }
+
         LogSourceProvider logSourceProvider = null;
         if (logStoreTableFactory != null) {
             logSourceProvider =
@@ -182,6 +199,29 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                                 .build());
     }
 
+    private ScanRuntimeProvider createCountStarScan() {
+        TableScan scan = 
table.newReadBuilder().withFilter(predicate).newScan();
+        List<PartitionEntry> partitionEntries = scan.listPartitionEntries();
+        long rowCount = 
partitionEntries.stream().mapToLong(PartitionEntry::recordCount).sum();
+        NumberSequenceRowSource source = new NumberSequenceRowSource(rowCount, 
rowCount);
+        return new SourceProvider() {
+            @Override
+            public Source<RowData, ?, ?> createSource() {
+                return source;
+            }
+
+            @Override
+            public boolean isBounded() {
+                return true;
+            }
+
+            @Override
+            public Optional<Integer> getParallelism() {
+                return Optional.of(1);
+            }
+        };
+    }
+
     protected abstract List<String> dynamicPartitionFilteringFields();
 
     @Override
@@ -209,6 +249,54 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                 asyncThreadNumber);
     }
 
+    @Override
+    public boolean applyAggregates(
+            List<int[]> groupingSets,
+            List<AggregateExpression> aggregateExpressions,
+            DataType producedDataType) {
+        if (isStreaming()) {
+            return false;
+        }
+
+        if (!(table instanceof DataTable)) {
+            return false;
+        }
+
+        if (!table.primaryKeys().isEmpty()) {
+            return false;
+        }
+
+        CoreOptions options = ((DataTable) table).coreOptions();
+        if (options.deletionVectorsEnabled()) {
+            return false;
+        }
+
+        if (groupingSets.size() != 1) {
+            return false;
+        }
+
+        if (groupingSets.get(0).length != 0) {
+            return false;
+        }
+
+        if (aggregateExpressions.size() != 1) {
+            return false;
+        }
+
+        if (!aggregateExpressions
+                .get(0)
+                .getFunctionDefinition()
+                .getClass()
+                .getName()
+                .equals(
+                        
"org.apache.flink.table.planner.functions.aggfunctions.Count1AggFunction")) {
+            return false;
+        }
+
+        isBatchCountStar = true;
+        return true;
+    }
+
     @Override
     public String asSummaryString() {
         return "Paimon-DataSource";
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 200550c88..ad5123205 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -62,7 +62,8 @@ public class DataTableSource extends BaseDataTableSource
                 null,
                 null,
                 null,
-                null);
+                null,
+                false);
     }
 
     public DataTableSource(
@@ -75,7 +76,8 @@ public class DataTableSource extends BaseDataTableSource
             @Nullable int[][] projectFields,
             @Nullable Long limit,
             @Nullable WatermarkStrategy<RowData> watermarkStrategy,
-            @Nullable List<String> dynamicPartitionFilteringFields) {
+            @Nullable List<String> dynamicPartitionFilteringFields,
+            boolean isBatchCountStar) {
         super(
                 tableIdentifier,
                 table,
@@ -85,7 +87,8 @@ public class DataTableSource extends BaseDataTableSource
                 predicate,
                 projectFields,
                 limit,
-                watermarkStrategy);
+                watermarkStrategy,
+                isBatchCountStar);
         this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
     }
 
@@ -101,7 +104,8 @@ public class DataTableSource extends BaseDataTableSource
                 projectFields,
                 limit,
                 watermarkStrategy,
-                dynamicPartitionFilteringFields);
+                dynamicPartitionFilteringFields,
+                isBatchCountStar);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 920a1ba14..2be0248f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -22,11 +22,13 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PartitionPredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
 
@@ -94,7 +96,8 @@ public abstract class FlinkTableSource
         List<ResolvedExpression> unConsumedFilters = new ArrayList<>();
         List<ResolvedExpression> consumedFilters = new ArrayList<>();
         List<Predicate> converted = new ArrayList<>();
-        PredicateVisitor<Boolean> visitor = new 
PartitionPredicateVisitor(partitionKeys);
+        PredicateVisitor<Boolean> onlyPartFieldsVisitor =
+                new PartitionPredicateVisitor(partitionKeys);
 
         for (ResolvedExpression filter : filters) {
             Optional<Predicate> predicateOptional = 
PredicateConverter.convert(rowType, filter);
@@ -103,7 +106,7 @@ public abstract class FlinkTableSource
                 unConsumedFilters.add(filter);
             } else {
                 Predicate p = predicateOptional.get();
-                if (isStreaming() || !p.visit(visitor)) {
+                if (isStreaming() || !p.visit(onlyPartFieldsVisitor)) {
                     unConsumedFilters.add(filter);
                 } else {
                     consumedFilters.add(filter);
@@ -168,9 +171,28 @@ public abstract class FlinkTableSource
 
     protected void scanSplitsForInference() {
         if (splitStatistics == null) {
-            List<Split> splits =
-                    
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
-            splitStatistics = new SplitStatistics(splits);
+            if (table instanceof DataTable) {
+                List<PartitionEntry> partitionEntries =
+                        table.newReadBuilder()
+                                .withFilter(predicate)
+                                .newScan()
+                                .listPartitionEntries();
+                long totalSize = 0;
+                long rowCount = 0;
+                for (PartitionEntry entry : partitionEntries) {
+                    totalSize += entry.fileSizeInBytes();
+                    rowCount += entry.recordCount();
+                }
+                long splitTargetSize = ((DataTable) 
table).coreOptions().splitTargetSize();
+                splitStatistics =
+                        new SplitStatistics((int) (totalSize / splitTargetSize 
+ 1), rowCount);
+            } else {
+                List<Split> splits =
+                        
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+                splitStatistics =
+                        new SplitStatistics(
+                                splits.size(), 
splits.stream().mapToLong(Split::rowCount).sum());
+            }
         }
     }
 
@@ -180,9 +202,9 @@ public abstract class FlinkTableSource
         private final int splitNumber;
         private final long totalRowCount;
 
-        protected SplitStatistics(List<Split> splits) {
-            this.splitNumber = splits.size();
-            this.totalRowCount = 
splits.stream().mapToLong(Split::rowCount).sum();
+        protected SplitStatistics(int splitNumber, long totalRowCount) {
+            this.splitNumber = splitNumber;
+            this.totalRowCount = totalRowCount;
         }
 
         public int splitNumber() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NumberSequenceRowSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NumberSequenceRowSource.java
new file mode 100644
index 000000000..e467e37a8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/NumberSequenceRowSource.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.apache.flink.util.SplittableIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A data source that produces a sequence of numbers (longs) to {@link 
RowData}. */
+public class NumberSequenceRowSource
+        implements Source<
+                        RowData,
+                        NumberSequenceRowSource.NumberSequenceSplit,
+                        
Collection<NumberSequenceRowSource.NumberSequenceSplit>>,
+                ResultTypeQueryable<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The starting number in the sequence, inclusive. */
+    private final long from;
+
+    /** The end number in the sequence, inclusive. */
+    private final long to;
+
+    /**
+     * Creates a new {@code NumberSequenceSource} that produces parallel 
sequences covering the
+     * range {@code from} to {@code to} (both boundaries are inclusive).
+     */
+    public NumberSequenceRowSource(long from, long to) {
+        checkArgument(from <= to, "'from' must be <= 'to'");
+        this.from = from;
+        this.to = to;
+    }
+
+    public long getFrom() {
+        return from;
+    }
+
+    public long getTo() {
+        return to;
+    }
+
+    // ------------------------------------------------------------------------
+    //  source methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return InternalTypeInfo.of(RowType.of(new BigIntType(false)));
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, NumberSequenceSplit> createReader(
+            SourceReaderContext readerContext) {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> createEnumerator(
+            final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+
+        final List<NumberSequenceSplit> splits =
+                splitNumberRange(from, to, enumContext.currentParallelism());
+        return new IteratorSourceEnumerator<>(enumContext, splits);
+    }
+
+    @Override
+    public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> restoreEnumerator(
+            final SplitEnumeratorContext<NumberSequenceSplit> enumContext,
+            Collection<NumberSequenceSplit> checkpoint) {
+        return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() 
{
+        return new SplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<NumberSequenceSplit>>
+            getEnumeratorCheckpointSerializer() {
+        return new CheckpointSerializer();
+    }
+
+    protected List<NumberSequenceSplit> splitNumberRange(long from, long to, 
int numSplits) {
+        final NumberSequenceIterator[] subSequences =
+                new NumberSequenceIterator(from, to).split(numSplits);
+        final ArrayList<NumberSequenceSplit> splits = new 
ArrayList<>(subSequences.length);
+
+        int splitId = 1;
+        for (NumberSequenceIterator seq : subSequences) {
+            if (seq.hasNext()) {
+                splits.add(
+                        new NumberSequenceSplit(
+                                String.valueOf(splitId++), seq.getCurrent(), 
seq.getTo()));
+            }
+        }
+
+        return splits;
+    }
+
+    // ------------------------------------------------------------------------
+    //  splits & checkpoint
+    // ------------------------------------------------------------------------
+
+    /** A split of the source, representing a number sub-sequence. */
+    public static class NumberSequenceSplit
+            implements IteratorSourceSplit<
+                    RowData, NumberSequenceRowSource.NumberSequenceIterator> {
+
+        private final String splitId;
+        private final long from;
+        private final long to;
+
+        public NumberSequenceSplit(String splitId, long from, long to) {
+            checkArgument(from <= to, "'from' must be <= 'to'");
+            this.splitId = checkNotNull(splitId);
+            this.from = from;
+            this.to = to;
+        }
+
+        @Override
+        public String splitId() {
+            return splitId;
+        }
+
+        public long from() {
+            return from;
+        }
+
+        public long to() {
+            return to;
+        }
+
+        @SuppressWarnings("ClassEscapesDefinedScope")
+        @Override
+        public NumberSequenceRowSource.NumberSequenceIterator getIterator() {
+            return new NumberSequenceRowSource.NumberSequenceIterator(from, 
to);
+        }
+
+        @SuppressWarnings("ClassEscapesDefinedScope")
+        @Override
+        public IteratorSourceSplit<RowData, 
NumberSequenceRowSource.NumberSequenceIterator>
+                getUpdatedSplitForIterator(
+                        final NumberSequenceRowSource.NumberSequenceIterator 
iterator) {
+            return new NumberSequenceSplit(splitId, iterator.getCurrent(), 
iterator.getTo());
+        }
+
+        @Override
+        public String toString() {
+            return String.format("NumberSequenceSplit [%d, %d] (%s)", from, 
to, splitId);
+        }
+    }
+
+    private static final class SplitSerializer
+            implements SimpleVersionedSerializer<NumberSequenceSplit> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(NumberSequenceSplit split) throws IOException {
+            checkArgument(
+                    split.getClass() == NumberSequenceSplit.class, "cannot 
serialize subclasses");
+
+            // We will serialize 2 longs (16 bytes) plus the UFT 
representation of the string (2 +
+            // length)
+            final DataOutputSerializer out =
+                    new DataOutputSerializer(split.splitId().length() + 18);
+            serializeV1(out, split);
+            return out.getCopyOfBuffer();
+        }
+
+        @Override
+        public NumberSequenceSplit deserialize(int version, byte[] serialized) 
throws IOException {
+            if (version != CURRENT_VERSION) {
+                throw new IOException("Unrecognized version: " + version);
+            }
+            final DataInputDeserializer in = new 
DataInputDeserializer(serialized);
+            return deserializeV1(in);
+        }
+
+        static void serializeV1(DataOutputView out, NumberSequenceSplit split) 
throws IOException {
+            out.writeUTF(split.splitId());
+            out.writeLong(split.from());
+            out.writeLong(split.to());
+        }
+
+        static NumberSequenceSplit deserializeV1(DataInputView in) throws 
IOException {
+            return new NumberSequenceSplit(in.readUTF(), in.readLong(), 
in.readLong());
+        }
+    }
+
+    private static final class CheckpointSerializer
+            implements 
SimpleVersionedSerializer<Collection<NumberSequenceSplit>> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        @Override
+        public int getVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        public byte[] serialize(Collection<NumberSequenceSplit> checkpoint) 
throws IOException {
+            // Each split needs 2 longs (16 bytes) plus the UFT representation 
of the string (2 +
+            // length)
+            // Assuming at most 4 digit split IDs, 22 bytes per split avoids 
any intermediate array
+            // resizing.
+            // plus four bytes for the length field
+            final DataOutputSerializer out = new 
DataOutputSerializer(checkpoint.size() * 22 + 4);
+            out.writeInt(checkpoint.size());
+            for (NumberSequenceSplit split : checkpoint) {
+                SplitSerializer.serializeV1(out, split);
+            }
+            return out.getCopyOfBuffer();
+        }
+
+        @Override
+        public Collection<NumberSequenceSplit> deserialize(int version, byte[] 
serialized)
+                throws IOException {
+            if (version != CURRENT_VERSION) {
+                throw new IOException("Unrecognized version: " + version);
+            }
+            final DataInputDeserializer in = new 
DataInputDeserializer(serialized);
+            final int num = in.readInt();
+            final ArrayList<NumberSequenceSplit> result = new ArrayList<>(num);
+            for (int remaining = num; remaining > 0; remaining--) {
+                result.add(SplitSerializer.deserializeV1(in));
+            }
+            return result;
+        }
+    }
+
+    private static class NumberSequenceIterator extends 
SplittableIterator<RowData> {
+
+        private static final long serialVersionUID = 1L;
+
+        /** The last number returned by the iterator. */
+        private final long to;
+
+        /** The next number to be returned. */
+        private long current;
+
+        /**
+         * Creates a new splittable iterator, returning the range [from, to]. 
Both boundaries of the
+         * interval are inclusive.
+         *
+         * @param from The first number returned by the iterator.
+         * @param to The last number returned by the iterator.
+         */
+        public NumberSequenceIterator(long from, long to) {
+            if (from > to) {
+                throw new IllegalArgumentException(
+                        "The 'to' value must not be smaller than the 'from' 
value.");
+            }
+
+            this.current = from;
+            this.to = to;
+        }
+
+        /**
+         * Internal constructor to allow for empty iterators.
+         *
+         * @param from The first number returned by the iterator.
+         * @param to The last number returned by the iterator.
+         * @param unused A dummy parameter to disambiguate the constructor.
+         */
+        @SuppressWarnings("unused")
+        private NumberSequenceIterator(long from, long to, boolean unused) {
+            this.current = from;
+            this.to = to;
+        }
+
+        public long getCurrent() {
+            return this.current;
+        }
+
+        public long getTo() {
+            return this.to;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return current <= to;
+        }
+
+        @Override
+        public RowData next() {
+            if (current <= to) {
+                return GenericRowData.of(current++);
+            } else {
+                throw new NoSuchElementException();
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public NumberSequenceIterator[] split(int numPartitions) {
+            if (numPartitions < 1) {
+                throw new IllegalArgumentException("The number of partitions 
must be at least 1.");
+            }
+
+            if (numPartitions == 1) {
+                return new NumberSequenceIterator[] {new 
NumberSequenceIterator(current, to)};
+            }
+
+            // here, numPartitions >= 2 !!!
+
+            long elementsPerSplit;
+
+            if (to - current + 1 >= 0) {
+                elementsPerSplit = (to - current + 1) / numPartitions;
+            } else {
+                // long overflow of the range.
+                // we compute based on half the distance, to prevent the 
overflow.
+                // in most cases it holds that: current < 0 and to > 0, except 
for: to == 0 and
+                // current
+                // == Long.MIN_VALUE
+                // the later needs a special case
+                final long halfDiff; // must be positive
+
+                if (current == Long.MIN_VALUE) {
+                    // this means to >= 0
+                    halfDiff = (Long.MAX_VALUE / 2 + 1) + to / 2;
+                } else {
+                    long posFrom = -current;
+                    if (posFrom > to) {
+                        halfDiff = to + ((posFrom - to) / 2);
+                    } else {
+                        halfDiff = posFrom + ((to - posFrom) / 2);
+                    }
+                }
+                elementsPerSplit = halfDiff / numPartitions * 2;
+            }
+
+            // figure out how many get one in addition
+            long numWithExtra = -(elementsPerSplit * numPartitions) + to - 
current + 1;
+
+            // based on rounding errors, we may have lost one
+            if (numWithExtra > numPartitions) {
+                elementsPerSplit++;
+                numWithExtra -= numPartitions;
+
+                if (numWithExtra > numPartitions) {
+                    throw new RuntimeException("Bug in splitting logic. Too 
much rounding loss.");
+                }
+            }
+
+            NumberSequenceIterator[] iters = new 
NumberSequenceIterator[numPartitions];
+            long curr = current;
+            int i = 0;
+            for (; i < numWithExtra; i++) {
+                long next = curr + elementsPerSplit + 1;
+                iters[i] = new NumberSequenceIterator(curr, next - 1);
+                curr = next;
+            }
+            for (; i < numPartitions; i++) {
+                long next = curr + elementsPerSplit;
+                iters[i] = new NumberSequenceIterator(curr, next - 1, true);
+                curr = next;
+            }
+
+            return iters;
+        }
+
+        @Override
+        public int getMaximumNumberOfSplits() {
+            if (to >= Integer.MAX_VALUE
+                    || current <= Integer.MIN_VALUE
+                    || to - current + 1 >= Integer.MAX_VALUE) {
+                return Integer.MAX_VALUE;
+            } else {
+                return (int) (to - current + 1);
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index c60443a12..f03a16368 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,10 +19,12 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.DateTimeUtils;
 
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
@@ -528,4 +530,61 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                                         DateTimeUtils.toInternal(timestamp, 
0), 0)))
                 .containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"));
     }
+
+    @Test
+    public void testCountStarAppend() {
+        sql("CREATE TABLE count_append (f0 INT, f1 STRING)");
+        sql("INSERT INTO count_append VALUES (1, 'a'), (2, 'b')");
+
+        String sql = "SELECT COUNT(*) FROM count_append";
+        assertThat(sql(sql)).containsOnly(Row.of(2L));
+        validateCount1PushDown(sql);
+    }
+
+    @Test
+    public void testCountStarPartAppend() {
+        sql("CREATE TABLE count_part_append (f0 INT, f1 STRING, dt STRING) 
PARTITIONED BY (dt)");
+        sql("INSERT INTO count_part_append VALUES (1, 'a', '1'), (1, 'a', 
'1'), (2, 'b', '2')");
+        String sql = "SELECT COUNT(*) FROM count_part_append WHERE dt = '1'";
+
+        assertThat(sql(sql)).containsOnly(Row.of(2L));
+        validateCount1PushDown(sql);
+    }
+
+    @Test
+    public void testCountStarAppendWithDv() {
+        sql(
+                "CREATE TABLE count_append_dv (f0 INT, f1 STRING) WITH 
('deletion-vectors.enabled' = 'true')");
+        sql("INSERT INTO count_append_dv VALUES (1, 'a'), (2, 'b')");
+
+        String sql = "SELECT COUNT(*) FROM count_append_dv";
+        assertThat(sql(sql)).containsOnly(Row.of(2L));
+        validateCount1NotPushDown(sql);
+    }
+
+    @Test
+    public void testCountStarPK() {
+        sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 
STRING)");
+        sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b')");
+
+        String sql = "SELECT COUNT(*) FROM count_pk";
+        assertThat(sql(sql)).containsOnly(Row.of(2L));
+        validateCount1NotPushDown(sql);
+    }
+
+    private void validateCount1PushDown(String sql) {
+        Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
+        while (!transformation.getInputs().isEmpty()) {
+            transformation = transformation.getInputs().get(0);
+        }
+        
assertThat(transformation.getDescription()).contains("Count1AggFunction");
+    }
+
+    private void validateCount1NotPushDown(String sql) {
+        Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
+        while (!transformation.getInputs().isEmpty()) {
+            transformation = transformation.getInputs().get(0);
+        }
+        
assertThat(transformation.getDescription()).doesNotContain("Count1AggFunction");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
new file mode 100644
index 000000000..8404d994f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/IteratorSourcesITCase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.fail;
+
+/**
+ * An integration test for the sources based on iterators.
+ *
+ * <p>This test uses the {@link NumberSequenceRowSource} as a concrete 
iterator source
+ * implementation, but covers all runtime-related aspects for all the 
iterator-based sources
+ * together.
+ */
+public class IteratorSourcesITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    public void testParallelSourceExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final DataStream<RowData> stream =
+                env.fromSource(
+                        new NumberSequenceRowSource(1L, 1_000L),
+                        WatermarkStrategy.noWatermarks(),
+                        "iterator source");
+
+        final List<RowData> result =
+                DataStreamUtils.collectBoundedStream(stream, "Iterator Source 
Test");
+
+        verifySequence(result, 1L, 1_000L);
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utils
+    // ------------------------------------------------------------------------
+
+    private static void verifySequence(
+            final List<RowData> sequence, final long from, final long to) {
+        if (sequence.size() != to - from + 1) {
+            fail(String.format("Expected: Sequence [%d, %d]. Found: %s", from, 
to, sequence));
+        }
+
+        final List<Long> list =
+                sequence.stream()
+                        .map(r -> r.getLong(0))
+                        .sorted(Long::compareTo)
+                        .collect(Collectors.toList());
+
+        int pos = 0;
+        for (long value = from; value <= to; value++, pos++) {
+            if (value != list.get(pos)) {
+                fail(String.format("Expected: Sequence [%d, %d]. Found: %s", 
from, to, list));
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/NumberSequenceRowSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/NumberSequenceRowSourceTest.java
new file mode 100644
index 000000000..5f17b842f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/NumberSequenceRowSourceTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.fail;
+
+/** Tests for the {@link NumberSequenceRowSource}. */
+class NumberSequenceRowSourceTest {
+
+    @Test
+    void testReaderCheckpoints() throws Exception {
+        final long from = 177;
+        final long mid = 333;
+        final long to = 563;
+        final long elementsPerCycle = (to - from) / 3;
+
+        final TestingReaderOutput<RowData> out = new TestingReaderOutput<>();
+
+        SourceReader<RowData, NumberSequenceRowSource.NumberSequenceSplit> 
reader = createReader();
+        reader.addSplits(
+                Arrays.asList(
+                        new 
NumberSequenceRowSource.NumberSequenceSplit("split-1", from, mid),
+                        new 
NumberSequenceRowSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+        long remainingInCycle = elementsPerCycle;
+        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+            if (--remainingInCycle <= 0) {
+                remainingInCycle = elementsPerCycle;
+                // checkpoint
+                List<NumberSequenceRowSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+                // re-create and restore
+                reader = createReader();
+                if (splits.isEmpty()) {
+                    reader.notifyNoMoreSplits();
+                } else {
+                    reader.addSplits(splits);
+                }
+            }
+        }
+
+        final List<RowData> result = out.getEmittedRecords();
+        validateSequence(result, from, to);
+    }
+
+    private static void validateSequence(
+            final List<RowData> sequence, final long from, final long to) {
+        if (sequence.size() != to - from + 1) {
+            failSequence(sequence, from, to);
+        }
+
+        long nextExpected = from;
+        for (RowData next : sequence) {
+            if (next.getLong(0) != nextExpected++) {
+                failSequence(sequence, from, to);
+            }
+        }
+    }
+
+    private static void failSequence(final List<RowData> sequence, final long 
from, final long to) {
+        fail(
+                String.format(
+                        "Expected: A sequence [%d, %d], but found: sequence 
(size %d) : %s",
+                        from, to, sequence.size(), sequence));
+    }
+
+    private static SourceReader<RowData, 
NumberSequenceRowSource.NumberSequenceSplit>
+            createReader() {
+        // the arguments passed in the source constructor matter only to the 
enumerator
+        return new NumberSequenceRowSource(0L, 0L).createReader(new 
DummyReaderContext());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utils / mocks
+    //
+    //  the "flink-connector-test-utils module has proper mocks and utils,
+    //  but cannot be used here, because it would create a cyclic dependency.
+    // ------------------------------------------------------------------------
+
+    private static final class DummyReaderContext implements 
SourceReaderContext {
+
+        @Override
+        public SourceReaderMetricGroup metricGroup() {
+            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+        }
+
+        @Override
+        public Configuration getConfiguration() {
+            return new Configuration();
+        }
+
+        @Override
+        public String getLocalHostName() {
+            return "localhost";
+        }
+
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
+        @Override
+        public void sendSplitRequest() {}
+
+        @Override
+        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+        }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
+    }
+
+    private static final class TestingReaderOutput<E> implements 
ReaderOutput<E> {
+
+        private final ArrayList<E> emittedRecords = new ArrayList<>();
+
+        @Override
+        public void collect(E record) {
+            emittedRecords.add(record);
+        }
+
+        @Override
+        public void collect(E record, long timestamp) {
+            collect(record);
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void markIdle() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void markActive() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public SourceOutput<E> createOutputForSplit(String splitId) {
+            return this;
+        }
+
+        @Override
+        public void releaseOutputForSplit(String splitId) {}
+
+        public ArrayList<E> getEmittedRecords() {
+            return emittedRecords;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 8b69ef285..f8aadb8bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -87,7 +87,8 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        null);
+                        null,
+                        false);
         
Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L);
         // TODO validate column statistics
     }
@@ -107,7 +108,8 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        null);
+                        null,
+                        false);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L);
         // TODO validate column statistics
     }
@@ -127,7 +129,8 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null,
                         null,
-                        null);
+                        null,
+                        false);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L);
         // TODO validate column statistics
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index 8e4643ef5..f5d412167 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -51,7 +51,8 @@ public class PrimaryKeyTableStatisticsTest extends 
FileStoreTableStatisticsTestB
                         null,
                         null,
                         null,
-                        null);
+                        null,
+                        false);
         
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L);
         // TODO validate column statistics
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index f09ab0924..ce0017eb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.util;
 import org.apache.paimon.utils.FileIOUtils;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -33,14 +34,22 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.CollectModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.file.Path;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 /** Similar to Flink's AbstractTestBase but using Junit5. */
@@ -303,4 +312,39 @@ public class AbstractTestBase {
             return env;
         }
     }
+
+    public static Transformation<?> translate(TableEnvironment env, String 
statement) {
+        TableEnvironmentImpl envImpl = (TableEnvironmentImpl) env;
+        List<Operation> operations = envImpl.getParser().parse(statement);
+
+        if (operations.size() != 1) {
+            throw new RuntimeException("No operation after parsing for " + 
statement);
+        }
+
+        Operation operation = operations.get(0);
+        if (operation instanceof QueryOperation) {
+            QueryOperation queryOperation = (QueryOperation) operation;
+            CollectModifyOperation sinkOperation = new 
CollectModifyOperation(queryOperation);
+            List<Transformation<?>> transformations;
+            try {
+                Method translate =
+                        
TableEnvironmentImpl.class.getDeclaredMethod("translate", List.class);
+                translate.setAccessible(true);
+                //noinspection unchecked
+                transformations =
+                        (List<Transformation<?>>)
+                                translate.invoke(envImpl, 
Collections.singletonList(sinkOperation));
+            } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+                throw new RuntimeException(e);
+            }
+
+            if (transformations.size() != 1) {
+                throw new RuntimeException("No transformation after 
translating for " + statement);
+            }
+
+            return transformations.get(0);
+        } else {
+            throw new RuntimeException();
+        }
+    }
 }

Reply via email to