This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eec038e7 [core] Introduce SimpleFileEntry to reduce memory in 
FileStoreCommit (#2943)
6eec038e7 is described below

commit 6eec038e7382fd630cbaadddd5c6e959e593d444
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 5 17:58:55 2024 +0800

    [core] Introduce SimpleFileEntry to reduce memory in FileStoreCommit (#2943)
---
 .../{ManifestEntry.java => FileEntry.java}         | 208 +++++++--------------
 .../org/apache/paimon/manifest/ManifestEntry.java  | 190 +++++++------------
 .../org/apache/paimon/manifest/ManifestFile.java   |  11 ++
 .../apache/paimon/manifest/ManifestFileMeta.java   |  12 +-
 .../apache/paimon/manifest/SimpleFileEntry.java    | 153 +++++++++++++++
 .../paimon/manifest/SimpleFileEntrySerializer.java |  64 +++++++
 .../paimon/operation/AbstractFileStoreScan.java    | 170 +++++++++--------
 .../apache/paimon/operation/FileDeletionBase.java  |   3 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  67 +++----
 .../org/apache/paimon/operation/FileStoreScan.java |   7 +
 .../paimon/partition/PartitionPredicate.java       |  14 +-
 .../java/org/apache/paimon/utils/ObjectsFile.java  |   4 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |   2 +-
 .../paimon/partition/PartitionPredicateTest.java   |   7 +-
 14 files changed, 523 insertions(+), 389 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
similarity index 62%
copy from 
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
copy to paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 25b6919f0..1b7b2c8bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -19,16 +19,10 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -38,91 +32,88 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.utils.SerializationUtils.newBytesType;
-
-/** Entry of a manifest file, representing an addition / deletion of a data 
file. */
-public class ManifestEntry {
-
-    private final FileKind kind;
-    // for tables without partition this field should be a row with 0 columns 
(not null)
-    private final BinaryRow partition;
-    private final int bucket;
-    private final int totalBuckets;
-    private final DataFileMeta file;
-
-    public ManifestEntry(
-            FileKind kind, BinaryRow partition, int bucket, int totalBuckets, 
DataFileMeta file) {
-        this.kind = kind;
-        this.partition = partition;
-        this.bucket = bucket;
-        this.totalBuckets = totalBuckets;
-        this.file = file;
-    }
+/** Entry representing a file. */
+public interface FileEntry {
 
-    public FileKind kind() {
-        return kind;
-    }
+    FileKind kind();
 
-    public BinaryRow partition() {
-        return partition;
-    }
+    BinaryRow partition();
 
-    public int bucket() {
-        return bucket;
-    }
+    int bucket();
 
-    public int totalBuckets() {
-        return totalBuckets;
-    }
+    int level();
 
-    public DataFileMeta file() {
-        return file;
-    }
+    String fileName();
 
-    public Identifier identifier() {
-        return new Identifier(partition, bucket, file.level(), 
file.fileName());
-    }
+    Identifier identifier();
 
-    public static RowType schema() {
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
-        fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
-        fields.add(new DataField(2, "_BUCKET", new IntType(false)));
-        fields.add(new DataField(3, "_TOTAL_BUCKETS", new IntType(false)));
-        fields.add(new DataField(4, "_FILE", DataFileMeta.schema()));
-        return new RowType(fields);
-    }
+    BinaryRow minKey();
 
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof ManifestEntry)) {
-            return false;
+    BinaryRow maxKey();
+
+    /**
+     * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
+     * file.
+     */
+    class Identifier {
+        public final BinaryRow partition;
+        public final int bucket;
+        public final int level;
+        public final String fileName;
+
+        /* Cache the hash code for the string */
+        private Integer hash;
+
+        public Identifier(BinaryRow partition, int bucket, int level, String 
fileName) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.level = level;
+            this.fileName = fileName;
         }
-        ManifestEntry that = (ManifestEntry) o;
-        return Objects.equals(kind, that.kind)
-                && Objects.equals(partition, that.partition)
-                && bucket == that.bucket
-                && totalBuckets == that.totalBuckets
-                && Objects.equals(file, that.file);
-    }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(kind, partition, bucket, totalBuckets, file);
-    }
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Identifier)) {
+                return false;
+            }
+            Identifier that = (Identifier) o;
+            return Objects.equals(partition, that.partition)
+                    && bucket == that.bucket
+                    && level == that.level
+                    && Objects.equals(fileName, that.fileName);
+        }
+
+        @Override
+        public int hashCode() {
+            if (hash == null) {
+                hash = Objects.hash(partition, bucket, level, fileName);
+            }
+            return hash;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("{%s, %d, %d, %s}", partition, bucket, level, 
fileName);
+        }
 
-    @Override
-    public String toString() {
-        return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, 
totalBuckets, file);
+        public String toString(FileStorePathFactory pathFactory) {
+            return pathFactory.getPartitionString(partition)
+                    + ", bucket "
+                    + bucket
+                    + ", level "
+                    + level
+                    + ", file "
+                    + fileName;
+        }
     }
 
-    public static Collection<ManifestEntry> 
mergeEntries(Iterable<ManifestEntry> entries) {
-        LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
+    static <T extends FileEntry> Collection<T> mergeEntries(Iterable<T> 
entries) {
+        LinkedHashMap<Identifier, T> map = new LinkedHashMap<>();
         mergeEntries(entries, map);
         return map.values();
     }
 
-    public static void mergeEntries(
+    static void mergeEntries(
             ManifestFile manifestFile,
             List<ManifestFileMeta> manifestFiles,
             Map<Identifier, ManifestEntry> map) {
@@ -146,10 +137,9 @@ public class ManifestEntry {
         }
     }
 
-    public static void mergeEntries(
-            Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry> 
map) {
-        for (ManifestEntry entry : entries) {
-            ManifestEntry.Identifier identifier = entry.identifier();
+    static <T extends FileEntry> void mergeEntries(Iterable<T> entries, 
Map<Identifier, T> map) {
+        for (T entry : entries) {
+            Identifier identifier = entry.identifier();
             switch (entry.kind()) {
                 case ADD:
                     Preconditions.checkState(
@@ -177,68 +167,12 @@ public class ManifestEntry {
         }
     }
 
-    public static void assertNoDelete(Collection<ManifestEntry> entries) {
-        for (ManifestEntry entry : entries) {
+    static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
+        for (T entry : entries) {
             Preconditions.checkState(
                     entry.kind() != FileKind.DELETE,
                     "Trying to delete file %s which is not previously added.",
-                    entry.file().fileName());
-        }
-    }
-
-    /**
-     * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
-     * file.
-     */
-    public static class Identifier {
-        public final BinaryRow partition;
-        public final int bucket;
-        public final int level;
-        public final String fileName;
-
-        /* Cache the hash code for the string */
-        private Integer hash;
-
-        private Identifier(BinaryRow partition, int bucket, int level, String 
fileName) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.level = level;
-            this.fileName = fileName;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof Identifier)) {
-                return false;
-            }
-            Identifier that = (Identifier) o;
-            return Objects.equals(partition, that.partition)
-                    && bucket == that.bucket
-                    && level == that.level
-                    && Objects.equals(fileName, that.fileName);
-        }
-
-        @Override
-        public int hashCode() {
-            if (hash == null) {
-                hash = Objects.hash(partition, bucket, level, fileName);
-            }
-            return hash;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("{%s, %d, %d, %s}", partition, bucket, level, 
fileName);
-        }
-
-        public String toString(FileStorePathFactory pathFactory) {
-            return pathFactory.getPartitionString(partition)
-                    + ", bucket "
-                    + bucket
-                    + ", level "
-                    + level
-                    + ", file "
-                    + fileName;
+                    entry.fileName());
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 25b6919f0..34ad169ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -19,29 +19,26 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.Filter;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
+import java.util.function.Function;
 
 import static org.apache.paimon.utils.SerializationUtils.newBytesType;
 
 /** Entry of a manifest file, representing an addition / deletion of a data 
file. */
-public class ManifestEntry {
+public class ManifestEntry implements FileEntry {
 
     private final FileKind kind;
     // for tables without partition this field should be a row with 0 columns 
(not null)
@@ -59,18 +56,41 @@ public class ManifestEntry {
         this.file = file;
     }
 
+    @Override
     public FileKind kind() {
         return kind;
     }
 
+    @Override
     public BinaryRow partition() {
         return partition;
     }
 
+    @Override
     public int bucket() {
         return bucket;
     }
 
+    @Override
+    public int level() {
+        return file.level();
+    }
+
+    @Override
+    public String fileName() {
+        return file.fileName();
+    }
+
+    @Override
+    public BinaryRow minKey() {
+        return file.minKey();
+    }
+
+    @Override
+    public BinaryRow maxKey() {
+        return file.maxKey();
+    }
+
     public int totalBuckets() {
         return totalBuckets;
     }
@@ -79,6 +99,7 @@ public class ManifestEntry {
         return file;
     }
 
+    @Override
     public Identifier identifier() {
         return new Identifier(partition, bucket, file.level(), 
file.fileName());
     }
@@ -116,129 +137,56 @@ public class ManifestEntry {
         return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, 
totalBuckets, file);
     }
 
-    public static Collection<ManifestEntry> 
mergeEntries(Iterable<ManifestEntry> entries) {
-        LinkedHashMap<Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        mergeEntries(entries, map);
-        return map.values();
-    }
-
-    public static void mergeEntries(
-            ManifestFile manifestFile,
-            List<ManifestFileMeta> manifestFiles,
-            Map<Identifier, ManifestEntry> map) {
-        List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
-                manifestFiles.stream()
-                        .map(
-                                manifestFileMeta ->
-                                        CompletableFuture.supplyAsync(
-                                                () ->
-                                                        manifestFile.read(
-                                                                
manifestFileMeta.fileName()),
-                                                
FileUtils.COMMON_IO_FORK_JOIN_POOL))
-                        .collect(Collectors.toList());
-
-        try {
-            for (CompletableFuture<List<ManifestEntry>> taskResult : 
manifestReadFutures) {
-                mergeEntries(taskResult.get(), map);
-            }
-        } catch (ExecutionException | InterruptedException e) {
-            throw new RuntimeException("Failed to read manifest file.", e);
+    /**
+     * According to the {@link ManifestCacheFilter}, entry that needs to be 
cached will be retained,
+     * so the entry that will not be accessed in the future will not be cached.
+     *
+     * <p>Implemented to {@link InternalRow} is for performance (No 
deserialization).
+     */
+    public static Filter<InternalRow> createCacheRowFilter(
+            @Nullable ManifestCacheFilter manifestCacheFilter, int 
numOfBuckets) {
+        if (manifestCacheFilter == null) {
+            return Filter.alwaysTrue();
         }
-    }
 
-    public static void mergeEntries(
-            Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry> 
map) {
-        for (ManifestEntry entry : entries) {
-            ManifestEntry.Identifier identifier = entry.identifier();
-            switch (entry.kind()) {
-                case ADD:
-                    Preconditions.checkState(
-                            !map.containsKey(identifier),
-                            "Trying to add file %s which is already added.",
-                            identifier);
-                    map.put(identifier, entry);
-                    break;
-                case DELETE:
-                    // each dataFile will only be added once and deleted once,
-                    // if we know that it is added before then both add and 
delete entry can be
-                    // removed because there won't be further operations on 
this file,
-                    // otherwise we have to keep the delete entry because the 
add entry must be
-                    // in the previous manifest files
-                    if (map.containsKey(identifier)) {
-                        map.remove(identifier);
-                    } else {
-                        map.put(identifier, entry);
-                    }
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unknown value kind " + entry.kind().name());
+        Function<InternalRow, BinaryRow> partitionGetter =
+                ManifestEntrySerializer.partitionGetter();
+        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
+        Function<InternalRow, Integer> totalBucketGetter =
+                ManifestEntrySerializer.totalBucketGetter();
+        return row -> {
+            if (numOfBuckets != totalBucketGetter.apply(row)) {
+                return true;
             }
-        }
-    }
 
-    public static void assertNoDelete(Collection<ManifestEntry> entries) {
-        for (ManifestEntry entry : entries) {
-            Preconditions.checkState(
-                    entry.kind() != FileKind.DELETE,
-                    "Trying to delete file %s which is not previously added.",
-                    entry.file().fileName());
-        }
+            return manifestCacheFilter.test(partitionGetter.apply(row), 
bucketGetter.apply(row));
+        };
     }
 
     /**
-     * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
-     * file.
+     * Read the corresponding entries based on the current required partition 
and bucket.
+     *
+     * <p>Implemented to {@link InternalRow} is for performance (No 
deserialization).
      */
-    public static class Identifier {
-        public final BinaryRow partition;
-        public final int bucket;
-        public final int level;
-        public final String fileName;
-
-        /* Cache the hash code for the string */
-        private Integer hash;
-
-        private Identifier(BinaryRow partition, int bucket, int level, String 
fileName) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.level = level;
-            this.fileName = fileName;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof Identifier)) {
+    public static Filter<InternalRow> createEntryRowFilter(
+            @Nullable PartitionPredicate partitionFilter,
+            @Nullable Filter<Integer> bucketFilter,
+            int numOfBuckets) {
+        Function<InternalRow, BinaryRow> partitionGetter =
+                ManifestEntrySerializer.partitionGetter();
+        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
+        Function<InternalRow, Integer> totalBucketGetter =
+                ManifestEntrySerializer.totalBucketGetter();
+        return row -> {
+            if ((partitionFilter != null && 
!partitionFilter.test(partitionGetter.apply(row)))) {
                 return false;
             }
-            Identifier that = (Identifier) o;
-            return Objects.equals(partition, that.partition)
-                    && bucket == that.bucket
-                    && level == that.level
-                    && Objects.equals(fileName, that.fileName);
-        }
 
-        @Override
-        public int hashCode() {
-            if (hash == null) {
-                hash = Objects.hash(partition, bucket, level, fileName);
+            if (bucketFilter != null && numOfBuckets == 
totalBucketGetter.apply(row)) {
+                return bucketFilter.test(bucketGetter.apply(row));
             }
-            return hash;
-        }
 
-        @Override
-        public String toString() {
-            return String.format("{%s, %d, %d, %s}", partition, bucket, level, 
fileName);
-        }
-
-        public String toString(FileStorePathFactory pathFactory) {
-            return pathFactory.getPartitionString(partition)
-                    + ", bucket "
-                    + bucket
-                    + ", level "
-                    + level
-                    + ", file "
-                    + fileName;
-        }
+            return true;
+        };
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 6070611da..a434e2acd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -188,5 +188,16 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     suggestedFileSize,
                     cache);
         }
+
+        public ObjectsFile<SimpleFileEntry> createSimpleFileEntryReader() {
+            RowType entryType = 
VersionedObjectSerializer.versionType(ManifestEntry.schema());
+            return new ObjectsFile<>(
+                    fileIO,
+                    new SimpleFileEntrySerializer(),
+                    fileFormat.createReaderFactory(entryType),
+                    fileFormat.createWriterFactory(entryType),
+                    pathFactory.manifestFileFactory(),
+                    cache);
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 5910ad75f..beecb4482 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.ManifestEntry.Identifier;
+import org.apache.paimon.manifest.FileEntry.Identifier;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.stats.BinaryTableStats;
@@ -215,7 +215,7 @@ public class ManifestFileMeta {
         }
 
         Map<Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        ManifestEntry.mergeEntries(manifestFile, candidates, map);
+        FileEntry.mergeEntries(manifestFile, candidates, map);
         if (!map.isEmpty()) {
             List<ManifestFileMeta> merged = manifestFile.write(new 
ArrayList<>(map.values()));
             result.addAll(merged);
@@ -270,7 +270,7 @@ public class ManifestFileMeta {
         // 2.1. try to skip base files by partition filter
 
         Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
-        ManifestEntry.mergeEntries(manifestFile, delta, deltaMerged);
+        FileEntry.mergeEntries(manifestFile, delta, deltaMerged);
 
         List<ManifestFileMeta> result = new ArrayList<>();
         int j = 0;
@@ -314,7 +314,7 @@ public class ManifestFileMeta {
         Map<Identifier, ManifestEntry> fullMerged = new LinkedHashMap<>();
         for (; j < base.size(); j++) {
             ManifestFileMeta file = base.get(j);
-            ManifestEntry.mergeEntries(manifestFile.read(file.fileName), 
fullMerged);
+            FileEntry.mergeEntries(manifestFile.read(file.fileName), 
fullMerged);
             boolean contains = false;
             for (Identifier identifier : deleteEntries) {
                 if (fullMerged.containsKey(identifier)) {
@@ -334,8 +334,8 @@ public class ManifestFileMeta {
 
         // 2.3. merge base files
 
-        ManifestEntry.mergeEntries(manifestFile, base.subList(j, base.size()), 
fullMerged);
-        ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
+        FileEntry.mergeEntries(manifestFile, base.subList(j, base.size()), 
fullMerged);
+        FileEntry.mergeEntries(deltaMerged.values(), fullMerged);
 
         // 2.4. write new manifest files
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
new file mode 100644
index 000000000..3e8b88755
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** A simple {@link FileEntry} only contains identifier and min max key. */
+public class SimpleFileEntry implements FileEntry {
+
+    private final FileKind kind;
+    private final BinaryRow partition;
+    private final int bucket;
+    private final int level;
+    private final String fileName;
+    private final BinaryRow minKey;
+    private final BinaryRow maxKey;
+
+    public SimpleFileEntry(
+            FileKind kind,
+            BinaryRow partition,
+            int bucket,
+            int level,
+            String fileName,
+            BinaryRow minKey,
+            BinaryRow maxKey) {
+        this.kind = kind;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.level = level;
+        this.fileName = fileName;
+        this.minKey = minKey;
+        this.maxKey = maxKey;
+    }
+
+    public static SimpleFileEntry from(ManifestEntry entry) {
+        return new SimpleFileEntry(
+                entry.kind(),
+                entry.partition(),
+                entry.bucket(),
+                entry.level(),
+                entry.fileName(),
+                entry.minKey(),
+                entry.maxKey());
+    }
+
+    public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
+        return 
entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList());
+    }
+
+    @Override
+    public FileKind kind() {
+        return kind;
+    }
+
+    @Override
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    @Override
+    public int bucket() {
+        return bucket;
+    }
+
+    @Override
+    public int level() {
+        return level;
+    }
+
+    @Override
+    public String fileName() {
+        return fileName;
+    }
+
+    @Override
+    public Identifier identifier() {
+        return new Identifier(partition, bucket, level, fileName);
+    }
+
+    @Override
+    public BinaryRow minKey() {
+        return minKey;
+    }
+
+    @Override
+    public BinaryRow maxKey() {
+        return maxKey;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SimpleFileEntry that = (SimpleFileEntry) o;
+        return bucket == that.bucket
+                && level == that.level
+                && kind == that.kind
+                && Objects.equals(partition, that.partition)
+                && Objects.equals(fileName, that.fileName)
+                && Objects.equals(minKey, that.minKey)
+                && Objects.equals(maxKey, that.maxKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(kind, partition, bucket, level, fileName, minKey, 
maxKey);
+    }
+
+    @Override
+    public String toString() {
+        return "{"
+                + "kind="
+                + kind
+                + ", partition="
+                + partition
+                + ", bucket="
+                + bucket
+                + ", level="
+                + level
+                + ", fileName='"
+                + fileName
+                + '\''
+                + ", minKey="
+                + minKey
+                + ", maxKey="
+                + maxKey
+                + '}';
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
new file mode 100644
index 000000000..f23f167f7
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
+/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only 
supports reading. */
+public class SimpleFileEntrySerializer extends 
VersionedObjectSerializer<SimpleFileEntry> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int version;
+
+    public SimpleFileEntrySerializer() {
+        super(ManifestEntry.schema());
+        this.version = new ManifestEntrySerializer().getVersion();
+    }
+
+    @Override
+    public int getVersion() {
+        return version;
+    }
+
+    @Override
+    public InternalRow convertTo(SimpleFileEntry meta) {
+        throw new UnsupportedOperationException("Only supports convert from 
row.");
+    }
+
+    @Override
+    public SimpleFileEntry convertFrom(int version, InternalRow row) {
+        if (this.version != version) {
+            throw new IllegalArgumentException("Unsupported version: " + 
version);
+        }
+
+        InternalRow file = row.getRow(4, 3);
+        return new SimpleFileEntry(
+                FileKind.fromByteValue(row.getByte(0)),
+                deserializeBinaryRow(row.getBinary(1)),
+                row.getInt(2),
+                file.getInt(10),
+                file.getString(0).toString(),
+                deserializeBinaryRow(file.getBinary(3)),
+                deserializeBinaryRow(file.getBinary(4)));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 81b173321..e6c95e885 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -20,13 +20,13 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.operation.metrics.ScanStats;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
@@ -67,10 +68,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final ManifestList manifestList;
     private final int numOfBuckets;
     private final boolean checkNumOfBuckets;
+    private final Integer scanManifestParallelism;
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
     protected final ScanBucketFilter bucketKeyFilter;
+    private final String branchName;
 
     private PartitionPredicate partitionFilter;
     private Snapshot specifiedSnapshot = null;
@@ -81,10 +84,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Long dataFileTimeMills = null;
 
     private ManifestCacheFilter manifestCacheFilter = null;
-    private final Integer scanManifestParallelism;
-
     private ScanMetrics scanMetrics = null;
-    private String branchName;
 
     public AbstractFileStoreScan(
             RowType partitionType,
@@ -112,21 +112,13 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withPartitionFilter(Predicate predicate) {
-        if (partitionType.getFieldCount() > 0 && predicate != null) {
-            this.partitionFilter = PartitionPredicate.fromPredicate(predicate);
-        } else {
-            this.partitionFilter = null;
-        }
+        this.partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
         return this;
     }
 
     @Override
     public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
-        if (partitionType.getFieldCount() > 0 && !partitions.isEmpty()) {
-            this.partitionFilter = 
PartitionPredicate.fromMultiple(partitionType, partitions);
-        } else {
-            this.partitionFilter = null;
-        }
+        this.partitionFilter = PartitionPredicate.fromMultiple(partitionType, 
partitions);
         return this;
     }
 
@@ -216,7 +208,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     @Override
     public Plan plan() {
 
-        Pair<Snapshot, List<ManifestEntry>> planResult = 
doPlan(this::readManifestFileMeta);
+        Pair<Snapshot, List<ManifestEntry>> planResult = doPlan();
 
         final Snapshot readSnapshot = planResult.getLeft();
         final List<ManifestEntry> files = planResult.getRight();
@@ -246,44 +238,34 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         };
     }
 
-    private Pair<Snapshot, List<ManifestEntry>> doPlan(
-            Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
+    @Override
+    public List<SimpleFileEntry> readSimpleEntries() {
+        List<ManifestFileMeta> manifests = readManifests().getRight();
+        Collection<SimpleFileEntry> mergedEntries =
+                readAndMergeFileEntries(
+                        manifests, this::readSimpleEntries, 
Filter.alwaysTrue(), new AtomicLong());
+        return new ArrayList<>(mergedEntries);
+    }
+
+    private Pair<Snapshot, List<ManifestEntry>> doPlan() {
         long started = System.nanoTime();
-        List<ManifestFileMeta> manifests = specifiedManifests;
-        Snapshot snapshot = null;
-        if (manifests == null) {
-            snapshot =
-                    specifiedSnapshot == null
-                            ? snapshotManager.latestSnapshot(branchName)
-                            : specifiedSnapshot;
-            if (snapshot == null) {
-                manifests = Collections.emptyList();
-            } else {
-                manifests = readManifests(snapshot);
-            }
-        }
+        Pair<Snapshot, List<ManifestFileMeta>> snapshotListPair = 
readManifests();
+        Snapshot snapshot = snapshotListPair.getLeft();
+        List<ManifestFileMeta> manifests = snapshotListPair.getRight();
 
         long startDataFiles =
                 manifests.stream().mapToLong(f -> f.numAddedFiles() + 
f.numDeletedFiles()).sum();
 
         AtomicLong cntEntries = new AtomicLong(0);
-        Iterable<ManifestEntry> entries =
-                ScanParallelExecutor.parallelismBatchIterable(
-                        files -> {
-                            List<ManifestEntry> entryList =
-                                    files.parallelStream()
-                                            
.filter(this::filterManifestFileMeta)
-                                            .flatMap(m -> 
readManifest.apply(m).stream())
-                                            
.filter(this::filterUnmergedManifestEntry)
-                                            .collect(Collectors.toList());
-                            cntEntries.getAndAdd(entryList.size());
-                            return entryList;
-                        },
+
+        Collection<ManifestEntry> mergedEntries =
+                readAndMergeFileEntries(
                         manifests,
-                        scanManifestParallelism);
+                        this::readManifestFileMeta,
+                        this::filterUnmergedManifestEntry,
+                        cntEntries);
 
         List<ManifestEntry> files = new ArrayList<>();
-        Collection<ManifestEntry> mergedEntries = 
ManifestEntry.mergeEntries(entries);
         long skippedByPartitionAndStats = startDataFiles - cntEntries.get();
         for (ManifestEntry file : mergedEntries) {
             if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
@@ -349,6 +331,50 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return Pair.of(snapshot, files);
     }
 
+    public <T extends FileEntry> Collection<T> readAndMergeFileEntries(
+            List<ManifestFileMeta> manifests,
+            Function<ManifestFileMeta, List<T>> manifestReader,
+            @Nullable Filter<T> filterUnmergedEntry,
+            @Nullable AtomicLong readEntries) {
+        Iterable<T> entries =
+                ScanParallelExecutor.parallelismBatchIterable(
+                        files -> {
+                            Stream<T> stream =
+                                    files.parallelStream()
+                                            
.filter(this::filterManifestFileMeta)
+                                            .flatMap(m -> 
manifestReader.apply(m).stream());
+                            if (filterUnmergedEntry != null) {
+                                stream = 
stream.filter(filterUnmergedEntry::test);
+                            }
+                            List<T> entryList = 
stream.collect(Collectors.toList());
+                            if (readEntries != null) {
+                                readEntries.getAndAdd(entryList.size());
+                            }
+                            return entryList;
+                        },
+                        manifests,
+                        scanManifestParallelism);
+
+        return FileEntry.mergeEntries(entries);
+    }
+
+    private Pair<Snapshot, List<ManifestFileMeta>> readManifests() {
+        List<ManifestFileMeta> manifests = specifiedManifests;
+        Snapshot snapshot = null;
+        if (manifests == null) {
+            snapshot =
+                    specifiedSnapshot == null
+                            ? snapshotManager.latestSnapshot(branchName)
+                            : specifiedSnapshot;
+            if (snapshot == null) {
+                manifests = Collections.emptyList();
+            } else {
+                manifests = readManifests(snapshot);
+            }
+        }
+        return Pair.of(snapshot, manifests);
+    }
+
     private List<ManifestFileMeta> readManifests(Snapshot snapshot) {
         switch (scanMode) {
             case ALL:
@@ -426,47 +452,25 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
         return manifestFileFactory
                 .create()
-                .read(manifest.fileName(), manifestCacheRowFilter(), 
manifestEntryRowFilter());
+                .read(
+                        manifest.fileName(),
+                        
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
+                        ManifestEntry.createEntryRowFilter(
+                                partitionFilter, bucketFilter, numOfBuckets));
     }
 
     /** Note: Keep this thread-safe. */
-    private Filter<InternalRow> manifestEntryRowFilter() {
-        Function<InternalRow, BinaryRow> partitionGetter =
-                ManifestEntrySerializer.partitionGetter();
-        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
-        Function<InternalRow, Integer> totalBucketGetter =
-                ManifestEntrySerializer.totalBucketGetter();
-        return row -> {
-            if ((partitionFilter != null && 
!partitionFilter.test(partitionGetter.apply(row)))) {
-                return false;
-            }
-
-            if (bucketFilter != null && numOfBuckets == 
totalBucketGetter.apply(row)) {
-                return bucketFilter.test(bucketGetter.apply(row));
-            }
-
-            return true;
-        };
-    }
-
-    /** Note: Keep this thread-safe. */
-    private Filter<InternalRow> manifestCacheRowFilter() {
-        if (manifestCacheFilter == null) {
-            return Filter.alwaysTrue();
-        }
-
-        Function<InternalRow, BinaryRow> partitionGetter =
-                ManifestEntrySerializer.partitionGetter();
-        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
-        Function<InternalRow, Integer> totalBucketGetter =
-                ManifestEntrySerializer.totalBucketGetter();
-        return row -> {
-            if (numOfBuckets != totalBucketGetter.apply(row)) {
-                return true;
-            }
-
-            return manifestCacheFilter.test(partitionGetter.apply(row), 
bucketGetter.apply(row));
-        };
+    private List<SimpleFileEntry> readSimpleEntries(ManifestFileMeta manifest) 
{
+        return manifestFileFactory
+                .createSimpleFileEntryReader()
+                .read(
+                        manifest.fileName(),
+                        // use filter for ManifestEntry
+                        // currently, projection is not pushed down to file 
format
+                        // see SimpleFileEntrySerializer
+                        
ManifestEntry.createCacheRowFilter(manifestCacheFilter, numOfBuckets),
+                        ManifestEntry.createEntryRowFilter(
+                                partitionFilter, bucketFilter, numOfBuckets));
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 2f7817f03..79a64b049 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
@@ -256,7 +257,7 @@ public abstract class FileDeletionBase {
         for (String manifest : files) {
             List<ManifestEntry> entries;
             entries = manifestFile.readWithIOException(manifest);
-            ManifestEntry.mergeEntries(entries, map);
+            FileEntry.mergeEntries(entries, map);
         }
 
         return map.values();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 0264f1e45..a64b8239b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
@@ -34,6 +35,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
 import org.apache.paimon.options.MemorySize;
@@ -48,7 +50,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -101,7 +102,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final SchemaManager schemaManager;
     private final String commitUser;
     private final RowType partitionType;
-    private final RowDataToObjectArrayConverter partitionObjectConverter;
     private final FileStorePathFactory pathFactory;
     private final SnapshotManager snapshotManager;
     private final ManifestFile manifestFile;
@@ -146,7 +146,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.schemaManager = schemaManager;
         this.commitUser = commitUser;
         this.partitionType = partitionType;
-        this.partitionObjectConverter = new 
RowDataToObjectArrayConverter(partitionType);
         this.pathFactory = pathFactory;
         this.snapshotManager = snapshotManager;
         this.manifestFile = manifestFileFactory.create();
@@ -213,7 +212,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         int attempts = 0;
         Snapshot latestSnapshot = null;
         Long safeLatestSnapshotId = null;
-        List<ManifestEntry> baseEntries = new ArrayList<>();
+        List<SimpleFileEntry> baseEntries = new ArrayList<>();
 
         List<ManifestEntry> appendTableFiles = new ArrayList<>();
         List<ManifestEntry> appendChangelog = new ArrayList<>();
@@ -228,6 +227,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 compactChangelog,
                 appendIndexFiles);
         try {
+            List<SimpleFileEntry> appendSimpleEntries = 
SimpleFileEntry.from(appendTableFiles);
             if (!ignoreEmptyCommit
                     || !appendTableFiles.isEmpty()
                     || !appendChangelog.isEmpty()
@@ -246,7 +246,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     baseEntries.addAll(
                             readAllEntriesFromChangedPartitions(
                                     latestSnapshot, appendTableFiles, 
compactTableFiles));
-                    noConflictsOrFail(latestSnapshot.commitUser(), 
baseEntries, appendTableFiles);
+                    noConflictsOrFail(
+                            latestSnapshot.commitUser(), baseEntries, 
appendSimpleEntries);
                     safeLatestSnapshotId = latestSnapshot.id();
                 }
 
@@ -274,8 +275,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 // This optimization is mainly used to decrease the number of 
times we read from
                 // files.
                 if (safeLatestSnapshotId != null) {
-                    baseEntries.addAll(appendTableFiles);
-                    noConflictsOrFail(latestSnapshot.commitUser(), 
baseEntries, compactTableFiles);
+                    baseEntries.addAll(appendSimpleEntries);
+                    noConflictsOrFail(
+                            latestSnapshot.commitUser(),
+                            baseEntries,
+                            SimpleFileEntry.from(compactTableFiles));
                     // assume this compact commit follows just after the 
append commit created above
                     safeLatestSnapshotId += 1;
                 }
@@ -924,7 +928,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     @SafeVarargs
-    private final List<ManifestEntry> readAllEntriesFromChangedPartitions(
+    private final List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
             Snapshot snapshot, List<ManifestEntry>... changes) {
         List<BinaryRow> changedPartitions =
                 Arrays.stream(changes)
@@ -935,8 +939,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         try {
             return scan.withSnapshot(snapshot)
                     .withPartitionFilter(changedPartitions)
-                    .plan()
-                    .files();
+                    .readSimpleEntries();
         } catch (Throwable e) {
             throw new RuntimeException("Cannot read manifest entries from 
changed partitions.", e);
         }
@@ -947,19 +950,21 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         noConflictsOrFail(
                 baseCommitUser,
                 readAllEntriesFromChangedPartitions(latestSnapshot, changes),
-                changes);
+                SimpleFileEntry.from(changes));
     }
 
     private void noConflictsOrFail(
-            String baseCommitUser, List<ManifestEntry> baseEntries, 
List<ManifestEntry> changes) {
-        List<ManifestEntry> allEntries = new ArrayList<>(baseEntries);
+            String baseCommitUser,
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> changes) {
+        List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(changes);
 
-        Collection<ManifestEntry> mergedEntries;
+        Collection<SimpleFileEntry> mergedEntries;
         try {
             // merge manifest entries and also check if the files we want to 
delete are still there
-            mergedEntries = ManifestEntry.mergeEntries(allEntries);
-            ManifestEntry.assertNoDelete(mergedEntries);
+            mergedEntries = FileEntry.mergeEntries(allEntries);
+            FileEntry.assertNoDelete(mergedEntries);
         } catch (Throwable e) {
             Pair<RuntimeException, RuntimeException> conflictException =
                     createConflictException(
@@ -979,9 +984,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         }
 
         // group entries by partitions, buckets and levels
-        Map<LevelIdentifier, List<ManifestEntry>> levels = new HashMap<>();
-        for (ManifestEntry entry : mergedEntries) {
-            int level = entry.file().level();
+        Map<LevelIdentifier, List<SimpleFileEntry>> levels = new HashMap<>();
+        for (SimpleFileEntry entry : mergedEntries) {
+            int level = entry.level();
             if (level >= 1) {
                 levels.computeIfAbsent(
                                 new LevelIdentifier(entry.partition(), 
entry.bucket(), level),
@@ -991,12 +996,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         // check for all LSM level >= 1, key ranges of files do not intersect
-        for (List<ManifestEntry> entries : levels.values()) {
-            entries.sort((a, b) -> keyComparator.compare(a.file().minKey(), 
b.file().minKey()));
+        for (List<SimpleFileEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.minKey(), 
b.minKey()));
             for (int i = 0; i + 1 < entries.size(); i++) {
-                ManifestEntry a = entries.get(i);
-                ManifestEntry b = entries.get(i + 1);
-                if (keyComparator.compare(a.file().maxKey(), 
b.file().minKey()) >= 0) {
+                SimpleFileEntry a = entries.get(i);
+                SimpleFileEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) {
                     Pair<RuntimeException, RuntimeException> conflictException 
=
                             createConflictException(
                                     "LSM conflicts detected! Give up 
committing. Conflict files are:\n"
@@ -1024,8 +1029,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     private Pair<RuntimeException, RuntimeException> createConflictException(
             String message,
             String baseCommitUser,
-            List<ManifestEntry> baseEntries,
-            List<ManifestEntry> changes,
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> changes,
             Throwable cause,
             int maxEntry) {
         String possibleCauses =
@@ -1058,13 +1063,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         String baseEntriesString =
                 "Base entries are:\n"
                         + baseEntries.stream()
-                                .map(ManifestEntry::toString)
+                                .map(Object::toString)
                                 .collect(Collectors.joining("\n"));
         String changesString =
                 "Changes are:\n"
-                        + changes.stream()
-                                .map(ManifestEntry::toString)
-                                .collect(Collectors.joining("\n"));
+                        + 
changes.stream().map(Object::toString).collect(Collectors.joining("\n"));
 
         RuntimeException fullException =
                 new RuntimeException(
@@ -1085,12 +1088,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     "Base entries are:\n"
                             + baseEntries.subList(0, 
Math.min(baseEntries.size(), maxEntry))
                                     .stream()
-                                    .map(ManifestEntry::toString)
+                                    .map(Object::toString)
                                     .collect(Collectors.joining("\n"));
             changesString =
                     "Changes are:\n"
                             + changes.subList(0, Math.min(changes.size(), 
maxEntry)).stream()
-                                    .map(ManifestEntry::toString)
+                                    .map(Object::toString)
                                     .collect(Collectors.joining("\n"));
             simplifiedException =
                     new RuntimeException(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index f9d60173b..dbed15bd3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
@@ -73,6 +74,12 @@ public interface FileStoreScan {
     /** Produce a {@link Plan}. */
     Plan plan();
 
+    /**
+     * Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some 
critical information, so it
+     * cannot perform filtering based on statistical information.
+     */
+    List<SimpleFileEntry> readSimpleEntries();
+
     /** Result plan of this scan. */
     interface Plan {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index b0a89e29e..1675c6c20 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -30,6 +30,8 @@ import org.apache.paimon.statistics.FullFieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
+import javax.annotation.Nullable;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -42,11 +44,21 @@ public interface PartitionPredicate {
     boolean test(
             long rowCount, InternalRow minValues, InternalRow maxValues, 
InternalArray nullCounts);
 
-    static PartitionPredicate fromPredicate(Predicate predicate) {
+    @Nullable
+    static PartitionPredicate fromPredicate(RowType partitionType, Predicate 
predicate) {
+        if (partitionType.getFieldCount() == 0 || predicate == null) {
+            return null;
+        }
+
         return new DefaultPartitionPredicate(predicate);
     }
 
+    @Nullable
     static PartitionPredicate fromMultiple(RowType partitionType, 
List<BinaryRow> partitions) {
+        if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
+            return null;
+        }
+
         return new MultiplePartitionPredicate(
                 new RowDataToObjectArrayConverter(partitionType), new 
HashSet<>(partitions));
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 4c1b3aa30..61a465e4b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -40,7 +40,7 @@ import java.util.List;
 import static org.apache.paimon.utils.FileUtils.createFormatReader;
 
 /** A file which contains several {@link T}s, provides read and write. */
-public abstract class ObjectsFile<T> {
+public class ObjectsFile<T> {
 
     protected final FileIO fileIO;
     protected final ObjectSerializer<T> serializer;
@@ -50,7 +50,7 @@ public abstract class ObjectsFile<T> {
 
     @Nullable private final ObjectsCache<String, T> cache;
 
-    protected ObjectsFile(
+    public ObjectsFile(
             FileIO fileIO,
             ObjectSerializer<T> serializer,
             FormatReaderFactory readerFactory,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index ac0c0a64e..611876867 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -106,7 +106,7 @@ public abstract class ManifestFileMetaTestBase {
                         .flatMap(f -> 
getManifestFile().read(f.fileName()).stream())
                         .collect(Collectors.toList());
         List<String> entryBeforeMerge =
-                ManifestEntry.mergeEntries(inputEntry).stream()
+                FileEntry.mergeEntries(inputEntry).stream()
                         .map(entry -> entry.kind() + "-" + 
entry.file().fileName())
                         .collect(Collectors.toList());
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
index f43610f71..b4e1f5ed1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
@@ -45,10 +45,7 @@ public class PartitionPredicateTest {
     public void testNoPartition() {
         PartitionPredicate predicate =
                 PartitionPredicate.fromMultiple(RowType.of(), 
Collections.singletonList(EMPTY_ROW));
-
-        assertThat(predicate.test(EMPTY_ROW)).isTrue();
-        assertThat(predicate.test(1, EMPTY_ROW, EMPTY_ROW, 
BinaryArray.fromLongArray(new Long[0])))
-                .isTrue();
+        assertThat(predicate).isNull();
     }
 
     @Test
@@ -60,7 +57,7 @@ public class PartitionPredicateTest {
                         and(builder.equal(0, 3), builder.equal(1, 5)),
                         and(builder.equal(0, 4), builder.equal(1, 6)));
 
-        PartitionPredicate p1 = PartitionPredicate.fromPredicate(predicate);
+        PartitionPredicate p1 = PartitionPredicate.fromPredicate(type, 
predicate);
         PartitionPredicate p2 =
                 PartitionPredicate.fromMultiple(
                         type, Arrays.asList(createPart(3, 5), createPart(4, 
6)));

Reply via email to