This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a4c3e3cd5 [core] Optimize IncrementalStartingScanner to plan with 
thread pool (#4206)
a4c3e3cd5 is described below

commit a4c3e3cd5f69128f060ed8784a6c438f82703138
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Sep 19 15:34:39 2024 +0800

    [core] Optimize IncrementalStartingScanner to plan with thread pool (#4206)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   5 +
 .../org/apache/paimon/AppendOnlyFileStore.java     |   2 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |   2 +-
 .../org/apache/paimon/manifest/ManifestEntry.java  |  14 ++
 .../apache/paimon/manifest/ManifestFileMeta.java   |  14 ++
 .../paimon/operation/AbstractFileStoreScan.java    | 193 ++++++++-------------
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   5 +-
 .../org/apache/paimon/operation/FileStoreScan.java |   7 +
 .../paimon/operation/KeyValueFileStoreScan.java    |   5 +-
 .../apache/paimon/operation/ManifestsReader.java   | 148 ++++++++++++++++
 .../snapshot/IncrementalStartingScanner.java       | 178 ++++++++-----------
 .../table/source/snapshot/SnapshotReader.java      |  12 ++
 .../table/source/snapshot/SnapshotReaderImpl.java  |  22 +++
 .../apache/paimon/table/system/AuditLogTable.java  |  22 +++
 14 files changed, 387 insertions(+), 242 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index be4b504ec..53bd1291f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -31,6 +31,7 @@ import org.apache.paimon.metastore.AddPartitionTagCallback;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
@@ -176,6 +177,10 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 new StatsFile(fileIO, pathFactory().statsFileFactory()));
     }
 
+    protected ManifestsReader newManifestsReader(boolean forWrite) {
+        return new ManifestsReader(partitionType, snapshotManager(), 
manifestListFactory(forWrite));
+    }
+
     @Override
     public RowType partitionType() {
         return partitionType;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ebd4d342f..289f0bde7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -136,13 +136,13 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 };
 
         return new AppendOnlyFileStoreScan(
+                newManifestsReader(forWrite),
                 partitionType,
                 bucketFilter,
                 snapshotManager(),
                 schemaManager,
                 schema,
                 manifestFileFactory(forWrite),
-                manifestListFactory(forWrite),
                 options.bucket(),
                 forWrite,
                 options.scanManifestParallelism(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 1e950d747..d2d1d9972 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -229,6 +229,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                     }
                 };
         return new KeyValueFileStoreScan(
+                newManifestsReader(forWrite),
                 partitionType,
                 bucketFilter,
                 snapshotManager(),
@@ -236,7 +237,6 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 schema,
                 keyValueFieldsExtractor,
                 manifestFileFactory(forWrite),
-                manifestListFactory(forWrite),
                 options.bucket(),
                 forWrite,
                 options.scanManifestParallelism(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 49cfe47c1..f7c5c4639 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -26,6 +26,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
@@ -160,4 +161,17 @@ public class ManifestEntry implements FileEntry {
                 .mapToLong(manifest -> manifest.file().rowCount())
                 .sum();
     }
+
+    // ----------------------- Serialization -----------------------------
+
+    private static final ThreadLocal<ManifestEntrySerializer> 
SERIALIZER_THREAD_LOCAL =
+            ThreadLocal.withInitial(ManifestEntrySerializer::new);
+
+    public byte[] toBytes() throws IOException {
+        return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
+    }
+
+    public ManifestEntry fromBytes(byte[] bytes) throws IOException {
+        return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index a4f900631..242abb189 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Objects;
 
@@ -120,4 +121,17 @@ public class ManifestFileMeta {
                 "{%s, %d, %d, %d, %s, %d}",
                 fileName, fileSize, numAddedFiles, numDeletedFiles, 
partitionStats, schemaId);
     }
+
+    // ----------------------- Serialization -----------------------------
+
+    private static final ThreadLocal<ManifestFileMetaSerializer> 
SERIALIZER_THREAD_LOCAL =
+            ThreadLocal.withInitial(ManifestFileMetaSerializer::new);
+
+    public byte[] toBytes() throws IOException {
+        return SERIALIZER_THREAD_LOCAL.get().serializeToBytes(this);
+    }
+
+    public ManifestFileMeta fromBytes(byte[] bytes) throws IOException {
+        return SERIALIZER_THREAD_LOCAL.get().deserializeFromBytes(bytes);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 824f90797..d04393281 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -29,7 +29,6 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.ScanMetrics;
@@ -38,7 +37,6 @@ import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -69,20 +67,19 @@ import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
 /** Default implementation of {@link FileStoreScan}. */
 public abstract class AbstractFileStoreScan implements FileStoreScan {
 
+    private final ManifestsReader manifestsReader;
     private final RowType partitionType;
     private final SnapshotManager snapshotManager;
     private final ManifestFile.Factory manifestFileFactory;
-    private final ManifestList manifestList;
     private final int numOfBuckets;
     private final boolean checkNumOfBuckets;
-    private final Integer scanManifestParallelism;
+    private final Integer parallelism;
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
     private final TableSchema schema;
     protected final ScanBucketFilter bucketKeyFilter;
 
-    private PartitionPredicate partitionFilter;
     private Snapshot specifiedSnapshot = null;
     private Filter<Integer> bucketFilter = null;
     private List<ManifestFileMeta> specifiedManifests = null;
@@ -95,44 +92,44 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private ScanMetrics scanMetrics = null;
 
     public AbstractFileStoreScan(
+            ManifestsReader manifestsReader,
             RowType partitionType,
             ScanBucketFilter bucketKeyFilter,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
             TableSchema schema,
             ManifestFile.Factory manifestFileFactory,
-            ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
-            Integer scanManifestParallelism) {
+            @Nullable Integer parallelism) {
+        this.manifestsReader = manifestsReader;
         this.partitionType = partitionType;
         this.bucketKeyFilter = bucketKeyFilter;
         this.snapshotManager = snapshotManager;
         this.schemaManager = schemaManager;
         this.schema = schema;
         this.manifestFileFactory = manifestFileFactory;
-        this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
         this.checkNumOfBuckets = checkNumOfBuckets;
         this.tableSchemas = new ConcurrentHashMap<>();
-        this.scanManifestParallelism = scanManifestParallelism;
+        this.parallelism = parallelism;
     }
 
     @Override
     public FileStoreScan withPartitionFilter(Predicate predicate) {
-        this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
+        manifestsReader.withPartitionFilter(predicate);
         return this;
     }
 
     @Override
     public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
-        this.partitionFilter = PartitionPredicate.fromMultiple(partitionType, 
partitions);
+        manifestsReader.withPartitionFilter(partitions);
         return this;
     }
 
     @Override
     public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
-        this.partitionFilter = predicate;
+        manifestsReader.withPartitionFilter(predicate);
         return this;
     }
 
@@ -219,6 +216,17 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Nullable
+    @Override
+    public Integer parallelism() {
+        return parallelism;
+    }
+
+    @Override
+    public ManifestsReader manifestsReader() {
+        return manifestsReader;
+    }
+
     @Override
     public Plan plan() {
 
@@ -249,21 +257,19 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public List<SimpleFileEntry> readSimpleEntries() {
-        List<ManifestFileMeta> manifests = readManifests().getRight();
+        List<ManifestFileMeta> manifests = readManifests().filteredManifests;
         Collection<SimpleFileEntry> mergedEntries =
-                readAndMergeFileEntries(manifests, this::readSimpleEntries, 
Filter.alwaysTrue());
+                readAndMergeFileEntries(manifests, this::readSimpleEntries);
         return new ArrayList<>(mergedEntries);
     }
 
     @Override
     public List<PartitionEntry> readPartitionEntries() {
-        List<ManifestFileMeta> manifests = readManifests().getRight();
+        List<ManifestFileMeta> manifests = readManifests().filteredManifests;
         Map<BinaryRow, PartitionEntry> partitions = new ConcurrentHashMap<>();
         Consumer<ManifestFileMeta> processor =
-                m ->
-                        PartitionEntry.merge(
-                                PartitionEntry.merge(readManifestFileMeta(m)), 
partitions);
-        randomlyOnlyExecute(getExecutorService(scanManifestParallelism), 
processor, manifests);
+                m -> 
PartitionEntry.merge(PartitionEntry.merge(readManifest(m)), partitions);
+        randomlyOnlyExecute(getExecutorService(parallelism), processor, 
manifests);
         return partitions.values().stream()
                 .filter(p -> p.fileCount() > 0)
                 .collect(Collectors.toList());
@@ -271,11 +277,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public List<BucketEntry> readBucketEntries() {
-        List<ManifestFileMeta> manifests = readManifests().getRight();
+        List<ManifestFileMeta> manifests = readManifests().filteredManifests;
         Map<Pair<BinaryRow, Integer>, BucketEntry> buckets = new 
ConcurrentHashMap<>();
         Consumer<ManifestFileMeta> processor =
-                m -> 
BucketEntry.merge(BucketEntry.merge(readManifestFileMeta(m)), buckets);
-        randomlyOnlyExecute(getExecutorService(scanManifestParallelism), 
processor, manifests);
+                m -> BucketEntry.merge(BucketEntry.merge(readManifest(m)), 
buckets);
+        randomlyOnlyExecute(getExecutorService(parallelism), processor, 
manifests);
         return buckets.values().stream()
                 .filter(p -> p.fileCount() > 0)
                 .collect(Collectors.toList());
@@ -283,16 +289,17 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private Pair<Snapshot, List<ManifestEntry>> doPlan() {
         long started = System.nanoTime();
-        Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair = 
readManifests();
-        Snapshot snapshot = snapshotListPair.getLeft();
-        List<ManifestFileMeta> manifests = snapshotListPair.getRight();
+        ManifestsReader.Result manifestsResult = readManifests();
+        Snapshot snapshot = manifestsResult.snapshot;
+        List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
 
         long startDataFiles =
-                manifests.stream().mapToLong(f -> f.numAddedFiles() - 
f.numDeletedFiles()).sum();
+                manifestsResult.allManifests.stream()
+                        .mapToLong(f -> f.numAddedFiles() - 
f.numDeletedFiles())
+                        .sum();
 
         Collection<ManifestEntry> mergedEntries =
-                readAndMergeFileEntries(
-                        manifests, this::readManifestFileMeta, 
this::filterUnmergedManifestEntry);
+                readAndMergeFileEntries(manifests, this::readManifest);
 
         List<ManifestEntry> files = new ArrayList<>();
         long skippedByPartitionAndStats = startDataFiles - 
mergedEntries.size();
@@ -367,69 +374,17 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
-            List<ManifestFileMeta> manifests,
-            Function<ManifestFileMeta, List<T>> manifestReader,
-            @Nullable Filter<T> filterUnmergedEntry) {
-        // in memory filter, do it first
-        manifests =
-                manifests.stream()
-                        .filter(this::filterManifestFileMeta)
-                        .collect(Collectors.toList());
-        Function<ManifestFileMeta, List<T>> reader =
-                file -> {
-                    List<T> entries = manifestReader.apply(file);
-                    if (filterUnmergedEntry != null) {
-                        entries =
-                                entries.stream()
-                                        .filter(filterUnmergedEntry::test)
-                                        .collect(Collectors.toList());
-                    }
-                    return entries;
-                };
+            List<ManifestFileMeta> manifests, Function<ManifestFileMeta, 
List<T>> manifestReader) {
         return FileEntry.mergeEntries(
-                sequentialBatchedExecute(reader, manifests, 
scanManifestParallelism));
+                sequentialBatchedExecute(manifestReader, manifests, 
parallelism));
     }
 
-    private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
-        List<ManifestFileMeta> manifests = specifiedManifests;
-        Snapshot snapshot = null;
-        if (manifests == null) {
-            snapshot =
-                    specifiedSnapshot == null
-                            ? snapshotManager.latestSnapshot()
-                            : specifiedSnapshot;
-            if (snapshot == null) {
-                manifests = Collections.emptyList();
-            } else {
-                manifests = readManifests(snapshot);
-            }
+    private ManifestsReader.Result readManifests() {
+        if (specifiedManifests != null) {
+            return new ManifestsReader.Result(null, specifiedManifests, 
specifiedManifests);
         }
-        return Pair.of(snapshot, manifests);
-    }
 
-    private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
-        switch (scanMode) {
-            case ALL:
-                return manifestList.readDataManifests(snapshot);
-            case DELTA:
-                return manifestList.readDeltaManifests(snapshot);
-            case CHANGELOG:
-                if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) {
-                    return manifestList.readChangelogManifests(snapshot);
-                }
-
-                // compatible with Paimon 0.2, we'll read extraFiles in 
DataFileMeta
-                // see comments on DataFileMeta#extraFiles
-                if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
-                    return manifestList.readDeltaManifests(snapshot);
-                }
-                throw new IllegalStateException(
-                        String.format(
-                                "Incremental scan does not accept %s snapshot",
-                                snapshot.commitKind()));
-            default:
-                throw new UnsupportedOperationException("Unknown scan kind " + 
scanMode.name());
-        }
+        return manifestsReader.read(specifiedSnapshot, scanMode);
     }
 
     // ------------------------------------------------------------------------
@@ -443,30 +398,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 id, key -> key == schema.id() ? schema : 
schemaManager.schema(id));
     }
 
-    /** Note: Keep this thread-safe. */
-    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
-        if (partitionFilter == null) {
-            return true;
-        }
-
-        SimpleStats stats = manifest.partitionStats();
-        return partitionFilter == null
-                || partitionFilter.test(
-                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
-                        stats.minValues(),
-                        stats.maxValues(),
-                        stats.nullCounts());
-    }
-
-    /** Note: Keep this thread-safe. */
-    private boolean filterUnmergedManifestEntry(ManifestEntry entry) {
-        if (manifestEntryFilter != null && !manifestEntryFilter.test(entry)) {
-            return false;
-        }
-
-        return filterByStats(entry);
-    }
-
     /** Note: Keep this thread-safe. */
     protected abstract boolean filterByStats(ManifestEntry entry);
 
@@ -481,15 +412,28 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     protected abstract List<ManifestEntry> 
filterWholeBucketByStats(List<ManifestEntry> entries);
 
     /** Note: Keep this thread-safe. */
-    private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
-        return manifestFileFactory
-                .create()
-                .read(
-                        manifest.fileName(),
-                        manifest.fileSize(),
-                        createCacheRowFilter(manifestCacheFilter, 
numOfBuckets),
-                        createEntryRowFilter(
-                                partitionFilter, bucketFilter, fileNameFilter, 
numOfBuckets));
+    @Override
+    public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+        List<ManifestEntry> entries =
+                manifestFileFactory
+                        .create()
+                        .read(
+                                manifest.fileName(),
+                                manifest.fileSize(),
+                                createCacheRowFilter(manifestCacheFilter, 
numOfBuckets),
+                                createEntryRowFilter(
+                                        manifestsReader.partitionFilter(),
+                                        bucketFilter,
+                                        fileNameFilter,
+                                        numOfBuckets));
+        List<ManifestEntry> filteredEntries = new ArrayList<>(entries.size());
+        for (ManifestEntry entry : entries) {
+            if ((manifestEntryFilter == null || 
manifestEntryFilter.test(entry))
+                    && filterByStats(entry)) {
+                filteredEntries.add(entry);
+            }
+        }
+        return filteredEntries;
     }
 
     /** Note: Keep this thread-safe. */
@@ -504,7 +448,10 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         // see SimpleFileEntrySerializer
                         createCacheRowFilter(manifestCacheFilter, 
numOfBuckets),
                         createEntryRowFilter(
-                                partitionFilter, bucketFilter, fileNameFilter, 
numOfBuckets));
+                                manifestsReader.partitionFilter(),
+                                bucketFilter,
+                                fileNameFilter,
+                                numOfBuckets));
     }
 
     /**
@@ -560,11 +507,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 return false;
             }
 
-            if (fileNameFilter != null && 
!fileNameFilter.test((fileNameGetter.apply(row)))) {
-                return false;
-            }
-
-            return true;
+            return fileNameFilter == null || 
fileNameFilter.test((fileNameGetter.apply(row)));
         };
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 056c82e82..b911d3884 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -22,7 +22,6 @@ import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.fileindex.FileIndexPredicate;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -52,25 +51,25 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     private final Map<Long, Predicate> dataFilterMapping = new HashMap<>();
 
     public AppendOnlyFileStoreScan(
+            ManifestsReader manifestsReader,
             RowType partitionType,
             ScanBucketFilter bucketFilter,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
             TableSchema schema,
             ManifestFile.Factory manifestFileFactory,
-            ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
             Integer scanManifestParallelism,
             boolean fileIndexReadEnabled) {
         super(
+                manifestsReader,
                 partitionType,
                 bucketFilter,
                 snapshotManager,
                 schemaManager,
                 schema,
                 manifestFileFactory,
-                manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
                 scanManifestParallelism);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index b1e0a2bd9..197b4dff4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -77,6 +77,13 @@ public interface FileStoreScan {
 
     FileStoreScan withMetrics(ScanMetrics metrics);
 
+    @Nullable
+    Integer parallelism();
+
+    ManifestsReader manifestsReader();
+
+    List<ManifestEntry> readManifest(ManifestFileMeta manifest);
+
     /** Produce a {@link Plan}. */
     Plan plan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 3e4eef985..5b067de08 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -23,7 +23,6 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
@@ -56,6 +55,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     private final ChangelogProducer changelogProducer;
 
     public KeyValueFileStoreScan(
+            ManifestsReader manifestsReader,
             RowType partitionType,
             ScanBucketFilter bucketFilter,
             SnapshotManager snapshotManager,
@@ -63,7 +63,6 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             TableSchema schema,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
             ManifestFile.Factory manifestFileFactory,
-            ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets,
             Integer scanManifestParallelism,
@@ -71,13 +70,13 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             MergeEngine mergeEngine,
             ChangelogProducer changelogProducer) {
         super(
+                manifestsReader,
                 partitionType,
                 bucketFilter,
                 snapshotManager,
                 schemaManager,
                 schema,
                 manifestFileFactory,
-                manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets,
                 scanManifestParallelism);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
new file mode 100644
index 000000000..5ee468af2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A util class to read manifest files. */
+@ThreadSafe
+public class ManifestsReader {
+
+    private final RowType partitionType;
+    private final SnapshotManager snapshotManager;
+    private final ManifestList.Factory manifestListFactory;
+
+    @Nullable private PartitionPredicate partitionFilter = null;
+
+    public ManifestsReader(
+            RowType partitionType,
+            SnapshotManager snapshotManager,
+            ManifestList.Factory manifestListFactory) {
+        this.partitionType = partitionType;
+        this.snapshotManager = snapshotManager;
+        this.manifestListFactory = manifestListFactory;
+    }
+
+    public ManifestsReader withPartitionFilter(Predicate predicate) {
+        this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
+        return this;
+    }
+
+    public ManifestsReader withPartitionFilter(List<BinaryRow> partitions) {
+        this.partitionFilter = PartitionPredicate.fromMultiple(partitionType, 
partitions);
+        return this;
+    }
+
+    public ManifestsReader withPartitionFilter(PartitionPredicate predicate) {
+        this.partitionFilter = predicate;
+        return this;
+    }
+
+    @Nullable
+    public PartitionPredicate partitionFilter() {
+        return partitionFilter;
+    }
+
+    public Result read(@Nullable Snapshot specifiedSnapshot, ScanMode 
scanMode) {
+        List<ManifestFileMeta> manifests;
+        Snapshot snapshot =
+                specifiedSnapshot == null ? snapshotManager.latestSnapshot() : 
specifiedSnapshot;
+        if (snapshot == null) {
+            manifests = Collections.emptyList();
+        } else {
+            manifests = readManifests(snapshot, scanMode);
+        }
+
+        List<ManifestFileMeta> filtered =
+                manifests.stream()
+                        .filter(this::filterManifestFileMeta)
+                        .collect(Collectors.toList());
+        return new Result(snapshot, manifests, filtered);
+    }
+
+    private List<ManifestFileMeta> readManifests(Snapshot snapshot, ScanMode 
scanMode) {
+        ManifestList manifestList = manifestListFactory.create();
+        switch (scanMode) {
+            case ALL:
+                return manifestList.readDataManifests(snapshot);
+            case DELTA:
+                return manifestList.readDeltaManifests(snapshot);
+            case CHANGELOG:
+                if (snapshot.version() <= Snapshot.TABLE_STORE_02_VERSION) {
+                    throw new UnsupportedOperationException(
+                            "Unsupported snapshot version: " + 
snapshot.version());
+                }
+                return manifestList.readChangelogManifests(snapshot);
+            default:
+                throw new UnsupportedOperationException("Unknown scan kind " + 
scanMode.name());
+        }
+    }
+
+    /** Note: Keep this thread-safe. */
+    private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+        if (partitionFilter == null) {
+            return true;
+        }
+
+        SimpleStats stats = manifest.partitionStats();
+        return partitionFilter == null
+                || partitionFilter.test(
+                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                        stats.minValues(),
+                        stats.maxValues(),
+                        stats.nullCounts());
+    }
+
+    /** Result for reading manifest files. */
+    public static final class Result {
+
+        public final Snapshot snapshot;
+        public final List<ManifestFileMeta> allManifests;
+        public final List<ManifestFileMeta> filteredManifests;
+
+        public Result(
+                Snapshot snapshot,
+                List<ManifestFileMeta> allManifests,
+                List<ManifestFileMeta> filteredManifests) {
+            this.snapshot = snapshot;
+            this.allManifests = allManifests;
+            this.filteredManifests = filteredManifests;
+        }
+    }
+
+    public static Result emptyResult() {
+        return new Result(null, Collections.emptyList(), 
Collections.emptyList());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index f07c1feeb..6a8f68891 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -22,27 +22,32 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.utils.ManifestReadThreadPool;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -51,9 +56,8 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalStartingScanner.class);
 
-    private long endingSnapshotId;
-
-    private ScanMode scanMode;
+    private final long endingSnapshotId;
+    private final ScanMode scanMode;
 
     public IncrementalStartingScanner(
             SnapshotManager snapshotManager, long start, long end, ScanMode 
scanMode) {
@@ -70,31 +74,64 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
         if (checkResult.isPresent()) {
             return checkResult.get();
         }
-        Map<SplitInfo, List<DataFileMeta>> grouped = new HashMap<>();
-        for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
-            List<DataSplit> splits = readSplits(reader, 
snapshotManager.snapshot(i));
-            for (DataSplit split : splits) {
-                grouped.computeIfAbsent(
-                                new SplitInfo(
-                                        split.partition(),
-                                        split.bucket(),
-                                        // take it for false, because multiple 
snapshot read may
-                                        // need merge for primary key table
-                                        false,
-                                        split.bucketPath(),
-                                        split.deletionFiles().orElse(null)),
-                                k -> new ArrayList<>())
-                        .addAll(split.dataFiles());
-            }
+        Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
ConcurrentHashMap<>();
+        ManifestsReader manifestsReader = reader.manifestsReader();
+
+        List<Long> snapshots =
+                LongStream.range(startingSnapshotId + 1, endingSnapshotId + 1)
+                        .boxed()
+                        .collect(Collectors.toList());
+
+        Iterable<ManifestFileMeta> manifests =
+                ManifestReadThreadPool.sequentialBatchedExecute(
+                        id -> {
+                            Snapshot snapshot = snapshotManager.snapshot(id);
+                            switch (scanMode) {
+                                case DELTA:
+                                    if (snapshot.commitKind() != 
CommitKind.APPEND) {
+                                        // ignore COMPACT and OVERWRITE
+                                        return Collections.emptyList();
+                                    }
+                                    break;
+                                case CHANGELOG:
+                                    if (snapshot.commitKind() == 
CommitKind.OVERWRITE) {
+                                        // ignore OVERWRITE
+                                        return Collections.emptyList();
+                                    }
+                                    break;
+                                default:
+                                    throw new UnsupportedOperationException(
+                                            "Unsupported scan mode: " + 
scanMode);
+                            }
+
+                            return manifestsReader.read(snapshot, 
scanMode).filteredManifests;
+                        },
+                        snapshots,
+                        reader.parallelism());
+
+        Iterable<ManifestEntry> entries =
+                ManifestReadThreadPool.sequentialBatchedExecute(
+                        reader::readManifest, Lists.newArrayList(manifests), 
reader.parallelism());
+
+        for (ManifestEntry entry : entries) {
+            checkArgument(
+                    entry.kind() == FileKind.ADD, "Delta or changelog should 
only have ADD files.");
+            grouped.compute(
+                    Pair.of(entry.partition(), entry.bucket()),
+                    (key, files) -> {
+                        if (files == null) {
+                            files = new ArrayList<>();
+                        }
+                        files.add(entry.file());
+                        return files;
+                    });
         }
 
         List<Split> result = new ArrayList<>();
-        for (Map.Entry<SplitInfo, List<DataFileMeta>> entry : 
grouped.entrySet()) {
-            BinaryRow partition = entry.getKey().partition;
-            int bucket = entry.getKey().bucket;
-            boolean rawConvertible = entry.getKey().rawConvertible;
-            String bucketPath = entry.getKey().bucketPath;
-            List<DeletionFile> deletionFiles = entry.getKey().deletionFiles;
+        for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : 
grouped.entrySet()) {
+            BinaryRow partition = entry.getKey().getLeft();
+            int bucket = entry.getKey().getRight();
+            String bucketPath = reader.pathFactory().bucketPath(partition, 
bucket).toString();
             for (SplitGenerator.SplitGroup splitGroup :
                     reader.splitGenerator().splitForBatch(entry.getValue())) {
                 DataSplit.Builder dataSplitBuilder =
@@ -103,11 +140,8 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                                 .withPartition(partition)
                                 .withBucket(bucket)
                                 .withDataFiles(splitGroup.files)
-                                .rawConvertible(rawConvertible)
+                                .rawConvertible(splitGroup.rawConvertible)
                                 .withBucketPath(bucketPath);
-                if (deletionFiles != null) {
-                    dataSplitBuilder.withDataDeletionFiles(deletionFiles);
-                }
                 result.add(dataSplitBuilder.build());
             }
         }
@@ -149,78 +183,4 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
 
         return Optional.empty();
     }
-
-    private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
-        switch (scanMode) {
-            case CHANGELOG:
-                return readChangeLogSplits(reader, s);
-            case DELTA:
-                return readDeltaSplits(reader, s);
-            default:
-                throw new UnsupportedOperationException("Unsupported scan 
kind: " + scanMode);
-        }
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) 
{
-        if (s.commitKind() != CommitKind.APPEND) {
-            // ignore COMPACT and OVERWRITE
-            return Collections.emptyList();
-        }
-        return (List) 
reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
-    }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private List<DataSplit> readChangeLogSplits(SnapshotReader reader, 
Snapshot s) {
-        if (s.commitKind() == CommitKind.OVERWRITE) {
-            // ignore OVERWRITE
-            return Collections.emptyList();
-        }
-        return (List) 
reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
-    }
-
-    /** Split information to pass. */
-    private static class SplitInfo {
-
-        private final BinaryRow partition;
-        private final int bucket;
-        private final boolean rawConvertible;
-        private final String bucketPath;
-        @Nullable private final List<DeletionFile> deletionFiles;
-
-        private SplitInfo(
-                BinaryRow partition,
-                int bucket,
-                boolean rawConvertible,
-                String bucketPath,
-                @Nullable List<DeletionFile> deletionFiles) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.rawConvertible = rawConvertible;
-            this.bucketPath = bucketPath;
-            this.deletionFiles = deletionFiles;
-        }
-
-        @Override
-        public int hashCode() {
-            return Arrays.hashCode(
-                    new Object[] {partition, bucket, rawConvertible, 
bucketPath, deletionFiles});
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-
-            if (!(obj instanceof SplitInfo)) {
-                return false;
-            }
-
-            SplitInfo that = (SplitInfo) obj;
-
-            return Objects.equals(partition, that.partition)
-                    && bucket == that.bucket
-                    && rawConvertible == that.rawConvertible
-                    && Objects.equals(bucketPath, that.bucketPath)
-                    && Objects.equals(deletionFiles, that.deletionFiles);
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 13a458d4a..bd07f49fd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -23,14 +23,17 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.BucketEntry;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -42,12 +45,21 @@ import java.util.Map;
 /** Read splits from specified {@link Snapshot} with given configuration. */
 public interface SnapshotReader {
 
+    @Nullable
+    Integer parallelism();
+
     SnapshotManager snapshotManager();
 
+    ManifestsReader manifestsReader();
+
+    List<ManifestEntry> readManifest(ManifestFileMeta manifest);
+
     ConsumerManager consumerManager();
 
     SplitGenerator splitGenerator();
 
+    FileStorePathFactory pathFactory();
+
     SnapshotReader withSnapshot(long snapshotId);
 
     SnapshotReader withSnapshot(Snapshot snapshot);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 03b6144be..b2fd1c785 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -30,10 +30,12 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.BucketEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -116,11 +118,26 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
         this.indexFileHandler = indexFileHandler;
     }
 
+    @Override
+    public Integer parallelism() {
+        return scan.parallelism();
+    }
+
     @Override
     public SnapshotManager snapshotManager() {
         return snapshotManager;
     }
 
+    @Override
+    public ManifestsReader manifestsReader() {
+        return scan.manifestsReader();
+    }
+
+    @Override
+    public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+        return scan.readManifest(manifest);
+    }
+
     @Override
     public ConsumerManager consumerManager() {
         return consumerManager;
@@ -131,6 +148,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return splitGenerator;
     }
 
+    @Override
+    public FileStorePathFactory pathFactory() {
+        return pathFactory;
+    }
+
     @Override
     public SnapshotReader withSnapshot(long snapshotId) {
         scan.withSnapshot(snapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 38457f29b..d9cf80289 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -57,6 +58,7 @@ import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.SimpleFileReader;
@@ -241,11 +243,26 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             this.snapshotReader = snapshotReader;
         }
 
+        @Override
+        public Integer parallelism() {
+            return snapshotReader.parallelism();
+        }
+
         @Override
         public SnapshotManager snapshotManager() {
             return snapshotReader.snapshotManager();
         }
 
+        @Override
+        public ManifestsReader manifestsReader() {
+            return snapshotReader.manifestsReader();
+        }
+
+        @Override
+        public List<ManifestEntry> readManifest(ManifestFileMeta manifest) {
+            return snapshotReader.readManifest(manifest);
+        }
+
         @Override
         public ConsumerManager consumerManager() {
             return snapshotReader.consumerManager();
@@ -256,6 +273,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return snapshotReader.splitGenerator();
         }
 
+        @Override
+        public FileStorePathFactory pathFactory() {
+            return snapshotReader.pathFactory();
+        }
+
         public SnapshotReader withSnapshot(long snapshotId) {
             snapshotReader.withSnapshot(snapshotId);
             return this;

Reply via email to