This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f64e66511 [core] Add cache to manifest files (#885)
f64e66511 is described below

commit f64e66511504692445aefbaae2a8f46e4f074be1
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 13 13:53:23 2023 +0800

    [core] Add cache to manifest files (#885)
---
 docs/content/maintenance/write-performance.md      |  6 ++
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../apache/paimon/data/RandomAccessInputView.java  |  5 ++
 .../main/java/org/apache/paimon/data/Segments.java | 44 ++++++++++++
 .../data/serializer/InternalRowSerializer.java     |  4 ++
 .../paimon/memory/HeapMemorySegmentPool.java       |  0
 .../apache/paimon/memory/MemorySegmentPool.java    |  0
 .../java/org/apache/paimon/AbstractFileStore.java  | 16 ++++-
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 +++
 .../org/apache/paimon/manifest/ManifestFile.java   | 47 +++++-------
 .../org/apache/paimon/manifest/ManifestList.java   | 76 ++++++++------------
 .../apache/paimon/utils/FileStorePathFactory.java  | 28 ++++++++
 .../org/apache/paimon/utils/ObjectSerializer.java  |  5 ++
 .../java/org/apache/paimon/utils/ObjectsCache.java | 84 ++++++++++++++++++++++
 .../java/org/apache/paimon/utils/ObjectsFile.java  | 81 +++++++++++++++++++++
 .../java/org/apache/paimon/utils/PathFactory.java  | 29 ++++++++
 .../org/apache/paimon/utils/SegmentsCache.java     | 58 +++++++++++++++
 .../paimon/manifest/ManifestFileMetaTest.java      |  3 +-
 .../apache/paimon/manifest/ManifestFileTest.java   |  3 +-
 .../apache/paimon/manifest/ManifestListTest.java   |  7 +-
 .../paimon/table/FileStoreTableTestBase.java       | 40 +++++++++++
 .../apache/paimon/flink/ForceCompactionITCase.java |  2 +-
 22 files changed, 466 insertions(+), 88 deletions(-)

diff --git a/docs/content/maintenance/write-performance.md 
b/docs/content/maintenance/write-performance.md
index c67990d23..1f01d75b6 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -53,6 +53,12 @@ It is recommended that the parallelism of sink should be 
less than or equal to t
     </tbody>
 </table>
 
+## Write Initialize
+
+In the initialization of write, the writer of the bucket needs to read all 
historical files. If there is a bottleneck
+here (For example, writing a large number of partitions simultaneously), you 
can use `manifest.cache-size` to cache
+the read manifest data to accelerate initialization.
+
 ## Compaction
 
 ### Number of Sorted Runs to Trigger Compaction
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b55585981..49fb22a92 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -158,6 +158,12 @@
             <td>Float</td>
             <td>The index load factor for lookup.</td>
         </tr>
+        <tr>
+            <td><h5>manifest.cache-size</h5></td>
+            <td style="word-wrap: break-word;">0 bytes</td>
+            <td>MemorySize</td>
+            <td>Cache size for reading manifest files.</td>
+        </tr>
         <tr>
             <td><h5>manifest.format</h5></td>
             <td style="word-wrap: break-word;">avro</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java 
b/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
index 4a87058c8..dec01ef1c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.MathUtils;
 
 import java.io.EOFException;
 import java.util.ArrayList;
+import java.util.List;
 
 /** A {@link AbstractPagedInputView} to read pages in memory. */
 public class RandomAccessInputView extends AbstractPagedInputView implements 
SeekableDataInputView {
@@ -86,4 +87,8 @@ public class RandomAccessInputView extends 
AbstractPagedInputView implements See
                 ? this.limitInLastSegment
                 : this.segmentSize;
     }
+
+    public List<MemorySegment> segments() {
+        return segments;
+    }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Segments.java 
b/paimon-common/src/main/java/org/apache/paimon/data/Segments.java
new file mode 100644
index 000000000..3982b2f36
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Segments.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.memory.MemorySegment;
+
+import java.util.ArrayList;
+
+/** Segments with limit in last segment. */
+public class Segments {
+
+    private final ArrayList<MemorySegment> segments;
+
+    private final int limitInLastSegment;
+
+    public Segments(ArrayList<MemorySegment> segments, int limitInLastSegment) 
{
+        this.segments = segments;
+        this.limitInLastSegment = limitInLastSegment;
+    }
+
+    public ArrayList<MemorySegment> segments() {
+        return segments;
+    }
+
+    public int limitInLastSegment() {
+        return limitInLastSegment;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 145e5811b..b0701be48 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -132,6 +132,10 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         return types.length;
     }
 
+    public DataType[] fieldTypes() {
+        return types;
+    }
+
     /** Convert {@link InternalRow} into {@link BinaryRow}. TODO modify it to 
code gen. */
     @Override
     public BinaryRow toBinaryRow(InternalRow row) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
similarity index 100%
rename from 
paimon-core/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
rename to 
paimon-common/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
similarity index 100%
rename from 
paimon-core/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
rename to 
paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 20fde8b42..5f8676012 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -26,11 +26,15 @@ import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.operation.FileStoreExpireImpl;
 import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.Comparator;
 
@@ -47,6 +51,8 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     protected final CoreOptions options;
     protected final RowType partitionType;
 
+    @Nullable private final SegmentsCache<String> manifestCache;
+
     public AbstractFileStore(
             FileIO fileIO,
             SchemaManager schemaManager,
@@ -58,6 +64,11 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
         this.schemaId = schemaId;
         this.options = options;
         this.partitionType = partitionType;
+        MemorySize manifestCacheSize = options.manifestCacheSize();
+        this.manifestCache =
+                manifestCacheSize.getBytes() == 0
+                        ? null
+                        : new SegmentsCache<>(options.pageSize(), 
manifestCacheSize);
     }
 
     public FileStorePathFactory pathFactory() {
@@ -81,13 +92,14 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 partitionType,
                 options.manifestFormat(),
                 pathFactory(),
-                options.manifestTargetSize().getBytes());
+                options.manifestTargetSize().getBytes(),
+                manifestCache);
     }
 
     @VisibleForTesting
     public ManifestList.Factory manifestListFactory() {
         return new ManifestList.Factory(
-                fileIO, partitionType, options.manifestFormat(), 
pathFactory());
+                fileIO, options.manifestFormat(), pathFactory(), 
manifestCache);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 40ac103e3..aac6fc36f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -133,6 +133,12 @@ public class CoreOptions implements Serializable {
                             "To avoid frequent manifest merges, this parameter 
specifies the minimum number "
                                     + "of ManifestFileMeta to merge.");
 
+    public static final ConfigOption<MemorySize> MANIFEST_CACHE_SIZE =
+            key("manifest.cache-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(0))
+                    .withDescription("Cache size for reading manifest files.");
+
     public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
             key("partition.default-name")
                     .stringType()
@@ -644,6 +650,10 @@ public class CoreOptions implements Serializable {
         return options.get(MANIFEST_TARGET_FILE_SIZE);
     }
 
+    public MemorySize manifestCacheSize() {
+        return options.get(MANIFEST_CACHE_SIZE);
+    }
+
     public String partitionDefaultName() {
         return options.get(PARTITION_DEFAULT_NAME);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 3f136da45..e5ea62794 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -31,9 +31,13 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -41,15 +45,11 @@ import java.util.List;
  * This file includes several {@link ManifestEntry}s, representing the 
additional changes since last
  * snapshot.
  */
-public class ManifestFile {
+public class ManifestFile extends ObjectsFile<ManifestEntry> {
 
-    private final FileIO fileIO;
     private final SchemaManager schemaManager;
     private final RowType partitionType;
-    private final ManifestEntrySerializer serializer;
-    private final FormatReaderFactory readerFactory;
     private final FormatWriterFactory writerFactory;
-    private final FileStorePathFactory pathFactory;
     private final long suggestedFileSize;
 
     private ManifestFile(
@@ -59,15 +59,13 @@ public class ManifestFile {
             ManifestEntrySerializer serializer,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
-            FileStorePathFactory pathFactory,
-            long suggestedFileSize) {
-        this.fileIO = fileIO;
+            PathFactory pathFactory,
+            long suggestedFileSize,
+            @Nullable SegmentsCache<String> cache) {
+        super(fileIO, serializer, readerFactory, pathFactory, cache);
         this.schemaManager = schemaManager;
         this.partitionType = partitionType;
-        this.serializer = serializer;
-        this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
-        this.pathFactory = pathFactory;
         this.suggestedFileSize = suggestedFileSize;
     }
 
@@ -76,15 +74,6 @@ public class ManifestFile {
         return suggestedFileSize;
     }
 
-    public List<ManifestEntry> read(String fileName) {
-        try {
-            return FileUtils.readListFromFile(
-                    fileIO, pathFactory.toManifestFilePath(fileName), 
serializer, readerFactory);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to read manifest file " + 
fileName, e);
-        }
-    }
-
     /**
      * Write several {@link ManifestEntry}s into manifest files.
      *
@@ -93,7 +82,7 @@ public class ManifestFile {
     public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
         RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
                 new RollingFileWriter<>(
-                        () -> new ManifestEntryWriter(writerFactory, 
pathFactory.newManifestFile()),
+                        () -> new ManifestEntryWriter(writerFactory, 
pathFactory.newPath()),
                         suggestedFileSize);
         try {
             writer.write(entries);
@@ -104,10 +93,6 @@ public class ManifestFile {
         return writer.result();
     }
 
-    public void delete(String fileName) {
-        fileIO.deleteQuietly(pathFactory.toManifestFilePath(fileName));
-    }
-
     private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, 
ManifestFileMeta> {
 
         private final FieldStatsCollector partitionStatsCollector;
@@ -166,6 +151,7 @@ public class ManifestFile {
         private final FileFormat fileFormat;
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
+        @Nullable private final SegmentsCache<String> cache;
 
         public Factory(
                 FileIO fileIO,
@@ -173,13 +159,15 @@ public class ManifestFile {
                 RowType partitionType,
                 FileFormat fileFormat,
                 FileStorePathFactory pathFactory,
-                long suggestedFileSize) {
+                long suggestedFileSize,
+                @Nullable SegmentsCache<String> cache) {
             this.fileIO = fileIO;
             this.schemaManager = schemaManager;
             this.partitionType = partitionType;
             this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
             this.suggestedFileSize = suggestedFileSize;
+            this.cache = cache;
         }
 
         public ManifestFile create() {
@@ -191,8 +179,9 @@ public class ManifestFile {
                     new ManifestEntrySerializer(),
                     fileFormat.createReaderFactory(entryType),
                     fileFormat.createWriterFactory(entryType),
-                    pathFactory,
-                    suggestedFileSize);
+                    pathFactory.manifestFileFactory(),
+                    suggestedFileSize,
+                    cache);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index c95718da0..86529e31c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -27,44 +27,32 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
-import java.io.IOException;
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /**
  * This file includes several {@link ManifestFileMeta}, representing all data 
of the whole table at
  * the corresponding snapshot.
  */
-public class ManifestList {
+public class ManifestList extends ObjectsFile<ManifestFileMeta> {
 
-    private final FileIO fileIO;
-    private final ManifestFileMetaSerializer serializer;
-    private final FormatReaderFactory readerFactory;
     private final FormatWriterFactory writerFactory;
-    private final FileStorePathFactory pathFactory;
 
     private ManifestList(
             FileIO fileIO,
             ManifestFileMetaSerializer serializer,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
-            FileStorePathFactory pathFactory) {
-        this.fileIO = fileIO;
-        this.serializer = serializer;
-        this.readerFactory = readerFactory;
+            PathFactory pathFactory,
+            @Nullable SegmentsCache<String> cache) {
+        super(fileIO, serializer, readerFactory, pathFactory, cache);
         this.writerFactory = writerFactory;
-        this.pathFactory = pathFactory;
-    }
-
-    public List<ManifestFileMeta> read(String fileName) {
-        try {
-            return FileUtils.readListFromFile(
-                    fileIO, pathFactory.toManifestListPath(fileName), 
serializer, readerFactory);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to read manifest list " + 
fileName, e);
-        }
     }
 
     /**
@@ -73,9 +61,20 @@ public class ManifestList {
      * <p>NOTE: This method is atomic.
      */
     public String write(List<ManifestFileMeta> metas) {
-        Path path = pathFactory.newManifestList();
+        Path path = pathFactory.newPath();
         try {
-            return write(metas, path);
+            try (PositionOutputStream out = fileIO.newOutputStream(path, 
false)) {
+                FormatWriter writer = writerFactory.create(out);
+                try {
+                    for (ManifestFileMeta record : metas) {
+                        writer.addElement(serializer.toRow(record));
+                    }
+                } finally {
+                    writer.flush();
+                    writer.finish();
+                }
+            }
+            return path.getName();
         } catch (Throwable e) {
             fileIO.deleteQuietly(path);
             throw new RuntimeException(
@@ -83,43 +82,23 @@ public class ManifestList {
         }
     }
 
-    private String write(List<ManifestFileMeta> metas, Path path) throws 
IOException {
-        try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
-            // Initialize the bulk writer to accept the ManifestFileMeta.
-            FormatWriter writer = writerFactory.create(out);
-            try {
-                for (ManifestFileMeta manifest : metas) {
-                    writer.addElement(serializer.toRow(manifest));
-                }
-            } finally {
-                writer.flush();
-                writer.finish();
-            }
-        }
-        return path.getName();
-    }
-
-    public void delete(String fileName) {
-        fileIO.deleteQuietly(pathFactory.toManifestListPath(fileName));
-    }
-
     /** Creator of {@link ManifestList}. */
     public static class Factory {
 
         private final FileIO fileIO;
-        private final RowType partitionType;
         private final FileFormat fileFormat;
         private final FileStorePathFactory pathFactory;
+        @Nullable private final SegmentsCache<String> cache;
 
         public Factory(
                 FileIO fileIO,
-                RowType partitionType,
                 FileFormat fileFormat,
-                FileStorePathFactory pathFactory) {
+                FileStorePathFactory pathFactory,
+                @Nullable SegmentsCache<String> cache) {
             this.fileIO = fileIO;
-            this.partitionType = partitionType;
             this.fileFormat = fileFormat;
             this.pathFactory = pathFactory;
+            this.cache = cache;
         }
 
         public ManifestList create() {
@@ -129,7 +108,8 @@ public class ManifestList {
                     new ManifestFileMetaSerializer(),
                     fileFormat.createReaderFactory(metaType),
                     fileFormat.createWriterFactory(metaType),
-                    pathFactory);
+                    pathFactory.manifestFileFactory(),
+                    cache);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index cf4220b8c..304fbdb82 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -141,4 +141,32 @@ public class FileStorePathFactory {
     public String uuid() {
         return uuid;
     }
+
+    public PathFactory manifestFileFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return newManifestFile();
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return toManifestFilePath(fileName);
+            }
+        };
+    }
+
+    public PathFactory manifestListFactory() {
+        return new PathFactory() {
+            @Override
+            public Path newPath() {
+                return newManifestList();
+            }
+
+            @Override
+            public Path toPath(String fileName) {
+                return toManifestListPath(fileName);
+            }
+        };
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
index 77644ab74..329dee230 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 
 import java.io.ByteArrayInputStream;
@@ -50,6 +51,10 @@ public abstract class ObjectSerializer<T> implements 
Serializable {
         return rowSerializer.getArity();
     }
 
+    public DataType[] fieldTypes() {
+        return rowSerializer.fieldTypes();
+    }
+
     /**
      * Serializes the given record to the given target output view.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
new file mode 100644
index 000000000..06ed61320
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.Segments;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentSource;
+import org.apache.paimon.types.RowType;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/** Cache records to {@link SegmentsCache} by compacted serializer. */
+public class ObjectsCache<K, V> {
+
+    private final SegmentsCache<K> cache;
+    private final ObjectSerializer<V> serializer;
+    private final RowCompactedSerializer compactedSerializer;
+    private final Function<K, CloseableIterator<InternalRow>> reader;
+
+    public ObjectsCache(
+            SegmentsCache<K> cache,
+            ObjectSerializer<V> serializer,
+            Function<K, CloseableIterator<InternalRow>> reader) {
+        this.cache = cache;
+        this.serializer = serializer;
+        this.compactedSerializer = new 
RowCompactedSerializer(RowType.of(serializer.fieldTypes()));
+        this.reader = reader;
+    }
+
+    public List<V> read(K key) throws IOException {
+        Segments segments = cache.getSegments(key, this::readSegments);
+        List<V> entries = new ArrayList<>();
+        RandomAccessInputView view =
+                new RandomAccessInputView(
+                        segments.segments(), cache.pageSize(), 
segments.limitInLastSegment());
+        while (true) {
+            try {
+                
entries.add(serializer.fromRow(compactedSerializer.deserialize(view)));
+            } catch (EOFException e) {
+                return entries;
+            }
+        }
+    }
+
+    private Segments readSegments(K key) {
+        try (CloseableIterator<InternalRow> iterator = reader.apply(key)) {
+            ArrayList<MemorySegment> segments = new ArrayList<>();
+            MemorySegmentSource segmentSource =
+                    () -> MemorySegment.allocateHeapMemory(cache.pageSize());
+            SimpleCollectingOutputView output =
+                    new SimpleCollectingOutputView(segments, segmentSource, 
cache.pageSize());
+            while (iterator.hasNext()) {
+                compactedSerializer.serialize(iterator.next(), output);
+            }
+            return new Segments(segments, 
output.getCurrentPositionInSegment());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
new file mode 100644
index 000000000..10ac5ee2c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.utils.FileUtils.createFormatReader;
+
+/** A file which contains several {@link T}s, provides read and write. */
+public abstract class ObjectsFile<T> {
+
+    protected final FileIO fileIO;
+    protected final ObjectSerializer<T> serializer;
+    protected final FormatReaderFactory readerFactory;
+    protected final PathFactory pathFactory;
+
+    @Nullable private final ObjectsCache<String, T> cache;
+
+    protected ObjectsFile(
+            FileIO fileIO,
+            ObjectSerializer<T> serializer,
+            FormatReaderFactory readerFactory,
+            PathFactory pathFactory,
+            @Nullable SegmentsCache<String> cache) {
+        this.fileIO = fileIO;
+        this.serializer = serializer;
+        this.readerFactory = readerFactory;
+        this.pathFactory = pathFactory;
+        this.cache =
+                cache == null ? null : new ObjectsCache<>(cache, serializer, 
this::createIterator);
+    }
+
+    public List<T> read(String fileName) {
+        try {
+            if (cache != null) {
+                return cache.read(fileName);
+            }
+
+            return FileUtils.readListFromFile(
+                    fileIO, pathFactory.toPath(fileName), serializer, 
readerFactory);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to read manifest list " + 
fileName, e);
+        }
+    }
+
+    private CloseableIterator<InternalRow> createIterator(String fileName) {
+        try {
+            return createFormatReader(fileIO, readerFactory, 
pathFactory.toPath(fileName))
+                    .toCloseableIterator();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void delete(String fileName) {
+        fileIO.deleteQuietly(pathFactory.toPath(fileName));
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/PathFactory.java
new file mode 100644
index 000000000..9d1cf87bb
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PathFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.Path;
+
+/** Path factory to create a path. */
+public interface PathFactory {
+
+    Path newPath();
+
+    Path toPath(String fileName);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
new file mode 100644
index 000000000..e42b262ab
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.Segments;
+import org.apache.paimon.options.MemorySize;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.function.Function;
+
+/** Cache {@link Segments}. */
+public class SegmentsCache<T> {
+
+    private static final int OBJECT_MEMORY_SIZE = 1000;
+
+    private final int pageSize;
+    private final Cache<T, Segments> cache;
+
+    public SegmentsCache(int pageSize, MemorySize maxMemorySize) {
+        this.pageSize = pageSize;
+        this.cache =
+                Caffeine.newBuilder()
+                        .weigher(this::weigh)
+                        .maximumWeight(maxMemorySize.getBytes())
+                        .executor(MoreExecutors.directExecutor())
+                        .build();
+    }
+
+    public int pageSize() {
+        return pageSize;
+    }
+
+    public Segments getSegments(T key, Function<T, Segments> viewFunction) {
+        return cache.get(key, viewFunction);
+    }
+
+    private int weigh(T cacheKey, Segments segments) {
+        return OBJECT_MEMORY_SIZE + segments.segments().size() * pageSize;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 1a8f10e28..6615e9074 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -151,7 +151,8 @@ public class ManifestFileMetaTest {
                                 PARTITION_TYPE,
                                 "default",
                                 
CoreOptions.FILE_FORMAT.defaultValue().toString()),
-                        Long.MAX_VALUE)
+                        Long.MAX_VALUE,
+                        null)
                 .create();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index d4773395b..5be026b7c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -107,7 +107,8 @@ public class ManifestFileTest {
                         DEFAULT_PART_TYPE,
                         avro,
                         pathFactory,
-                        suggestedFileSize)
+                        suggestedFileSize,
+                        null)
                 .create();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index bbceeff3a..824d3e219 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -95,11 +95,6 @@ public class ManifestListTest {
                         TestKeyValueGenerator.DEFAULT_PART_TYPE,
                         "default",
                         CoreOptions.FILE_FORMAT.defaultValue().toString());
-        return new ManifestList.Factory(
-                        FileIOFinder.find(path),
-                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
-                        avro,
-                        pathFactory)
-                .create();
+        return new ManifestList.Factory(FileIOFinder.find(path), avro, 
pathFactory, null).create();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index b5ea597bb..bf99f93f7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
@@ -404,6 +405,45 @@ public abstract class FileStoreTableTestBase {
         write.close();
     }
 
+    @Test
+    public void testManifestCache() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf ->
+                                conf.set(
+                                        CoreOptions.MANIFEST_CACHE_SIZE,
+                                        MemorySize.ofMebiBytes(1)));
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // lots of commits, produce lots of manifest
+        List<String> expected = new ArrayList<>();
+        int cnt = 50;
+        for (int i = 0; i < cnt; i++) {
+            write.write(rowData(i, i, (long) i));
+            commit.commit(i, write.prepareCommit(false, i));
+            expected.add(
+                    
String.format("%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i, i));
+        }
+        write.close();
+
+        // create new write and reload manifests
+        write = table.newWrite(commitUser);
+        for (int i = 0; i < cnt; i++) {
+            write.write(rowData(i, i + 1, (long) i + 1));
+            expected.add(
+                    String.format(
+                            
"%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i + 1, i + 1));
+        }
+        commit.commit(cnt, write.prepareCommit(false, cnt));
+
+        // check result
+        List<String> result =
+                getResult(table.newRead(), table.newScan().plan().splits(), 
BATCH_ROW_TO_STRING);
+        assertThat(result.size()).isEqualTo(expected.size());
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
     @Test
     public void testWriteWithoutCompactionAndExpiration() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
index 1306dd386..50ab2d0aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
@@ -234,7 +234,7 @@ public class ForceCompactionITCase extends 
CatalogITCaseBase {
                         CoreOptions.FILE_FORMAT.defaultValue().toString());
 
         List<ManifestFileMeta> manifestFileMetas =
-                new ManifestList.Factory(LocalFileIO.create(), partType, avro, 
pathFactory)
+                new ManifestList.Factory(LocalFileIO.create(), avro, 
pathFactory, null)
                         .create()
                         .read(snapshot.deltaManifestList());
         
assertThat(manifestFileMetas.get(0).numDeletedFiles()).isGreaterThanOrEqualTo(1);


Reply via email to