This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 675374fb9 [core] Introduce cache.manifest-max-memory to CachingCatalog 
(#3856)
675374fb9 is described below

commit 675374fb9116b6d428b4b856f87bd6c683181edf
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 2 19:12:47 2024 +0800

    [core] Introduce cache.manifest-max-memory to CachingCatalog (#3856)
---
 .../generated/catalog_configuration.html           | 18 +++++
 .../org/apache/paimon/codegen/CompileUtils.java    |  5 +-
 .../org/apache/paimon/io/cache/CacheManager.java   | 11 +--
 .../org/apache/paimon/options/CatalogOptions.java  | 18 +++++
 .../apache/paimon/reader/RecordReaderIterator.java |  4 +-
 .../java/org/apache/paimon/AbstractFileStore.java  | 25 ++++---
 .../src/main/java/org/apache/paimon/FileStore.java |  4 ++
 .../org/apache/paimon/catalog/CachingCatalog.java  | 72 +++++++++++++++++--
 .../org/apache/paimon/catalog/CatalogFactory.java  | 22 +-----
 .../iceberg/manifest/IcebergManifestFile.java      |  1 +
 .../iceberg/manifest/IcebergManifestList.java      |  1 +
 .../org/apache/paimon/index/IndexFileHandler.java  | 20 ------
 .../org/apache/paimon/lookup/RocksDBState.java     |  4 +-
 .../apache/paimon/manifest/IndexManifestFile.java  | 18 +++--
 .../org/apache/paimon/manifest/ManifestFile.java   | 19 +++--
 .../org/apache/paimon/manifest/ManifestList.java   | 19 +++--
 .../paimon/manifest/SimpleFileEntrySerializer.java |  3 +-
 .../org/apache/paimon/mergetree/LookupFile.java    |  3 +-
 .../apache/paimon/privilege/PrivilegedCatalog.java | 14 ++++
 .../paimon/privilege/PrivilegedFileStore.java      |  9 ++-
 .../paimon/table/AbstractFileStoreTable.java       |  6 ++
 .../paimon/table/DelegatedFileStoreTable.java      |  6 ++
 .../org/apache/paimon/table/FileStoreTable.java    |  4 ++
 .../java/org/apache/paimon/utils/ObjectsCache.java | 63 +++++++++++------
 .../java/org/apache/paimon/utils/ObjectsFile.java  | 63 ++++++++++++-----
 .../org/apache/paimon/utils/SegmentsCache.java     | 46 ++++++++++--
 .../apache/paimon/catalog/CachingCatalogTest.java  | 81 ++++++++++++++++++++++
 .../paimon/catalog/TestableCachingCatalog.java     |  3 +-
 .../manifest/IndexManifestFileHandlerTest.java     |  6 +-
 .../org/apache/paimon/utils/ObjectsCacheTest.java  |  4 +-
 30 files changed, 439 insertions(+), 133 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 4508d898d..c7449eeb6 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -44,6 +44,24 @@ under the License.
             <td>Duration</td>
             <td>Controls the duration for which databases and tables in the 
catalog are cached.</td>
         </tr>
+        <tr>
+            <td><h5>cache.manifest.max-memory</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Controls the maximum memory to cache manifest content.</td>
+        </tr>
+        <tr>
+            <td><h5>cache.manifest.small-file-memory</h5></td>
+            <td style="word-wrap: break-word;">128 mb</td>
+            <td>MemorySize</td>
+            <td>Controls the cache memory to cache small manifest files.</td>
+        </tr>
+        <tr>
+            <td><h5>cache.manifest.small-file-threshold</h5></td>
+            <td style="word-wrap: break-word;">1 mb</td>
+            <td>MemorySize</td>
+            <td>Controls the threshold of small manifest file.</td>
+        </tr>
         <tr>
             <td><h5>client-pool-size</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
index e13248a92..fcad6bf30 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
@@ -20,7 +20,6 @@ package org.apache.paimon.codegen;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import org.codehaus.janino.SimpleCompiler;
 import org.slf4j.Logger;
@@ -46,12 +45,12 @@ public final class CompileUtils {
      */
     static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE =
             Caffeine.newBuilder()
+                    .softValues()
                     // estimated maximum planning/startup time
                     .expireAfterAccess(Duration.ofMinutes(5))
                     // estimated cache size
                     .maximumSize(300)
-                    .softValues()
-                    .executor(MoreExecutors.directExecutor())
+                    .executor(Runnable::run)
                     .build();
 
     /**
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 3f09fa5f4..b8f00205e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -25,7 +25,6 @@ import org.apache.paimon.options.MemorySize;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import java.io.IOException;
 
@@ -48,13 +47,13 @@ public class CacheManager {
                         .weigher(this::weigh)
                         .maximumWeight(maxMemorySize.getBytes())
                         .removalListener(this::onRemoval)
-                        .executor(MoreExecutors.directExecutor())
+                        .executor(Runnable::run)
                         .build();
         this.fileReadCount = 0;
     }
 
     @VisibleForTesting
-    public Cache<CacheKey, CacheValue> cache() {
+    public Cache<CacheKey, ?> cache() {
         return cache;
     }
 
@@ -81,8 +80,10 @@ public class CacheManager {
     }
 
     private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) 
{
-        value.isClosed = true;
-        value.callback.onRemoval(key);
+        if (value != null) {
+            value.isClosed = true;
+            value.callback.onRemoval(key);
+        }
     }
 
     public int fileReadCount() {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 980083e71..d9758d4b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -105,6 +105,24 @@ public class CatalogOptions {
                     .withDescription(
                             "Controls the duration for which databases and 
tables in the catalog are cached.");
 
+    public static final ConfigOption<MemorySize> 
CACHE_MANIFEST_SMALL_FILE_MEMORY =
+            key("cache.manifest.small-file-memory")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription("Controls the cache memory to cache small 
manifest files.");
+
+    public static final ConfigOption<MemorySize> 
CACHE_MANIFEST_SMALL_FILE_THRESHOLD =
+            key("cache.manifest.small-file-threshold")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(1))
+                    .withDescription("Controls the threshold of small manifest 
file.");
+
+    public static final ConfigOption<MemorySize> CACHE_MANIFEST_MAX_MEMORY =
+            key("cache.manifest.max-memory")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription("Controls the maximum memory to cache 
manifest content.");
+
     public static final ConfigOption<String> LINEAGE_META =
             key("lineage-meta")
                     .stringType()
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
index 5a807c6f1..98a485a9c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.reader;
 
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.IOUtils;
 
 import java.io.IOException;
 
@@ -34,7 +35,8 @@ public class RecordReaderIterator<T> implements 
CloseableIterator<T> {
         this.reader = reader;
         try {
             this.currentIterator = reader.readBatch();
-        } catch (IOException e) {
+        } catch (Exception e) {
+            IOUtils.closeQuietly(reader);
             throw new RuntimeException(e);
         }
         this.advanced = false;
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 49d2c47f0..5120db295 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
@@ -74,7 +75,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
     protected final RowType partitionType;
     private final CatalogEnvironment catalogEnvironment;
 
-    @Nullable private final SegmentsCache<String> writeManifestCache;
+    @Nullable private final SegmentsCache<Path> writeManifestCache;
+    @Nullable private SegmentsCache<Path> readManifestCache;
 
     protected AbstractFileStore(
             FileIO fileIO,
@@ -89,11 +91,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
         this.options = options;
         this.partitionType = partitionType;
         this.catalogEnvironment = catalogEnvironment;
-        MemorySize writeManifestCache = options.writeManifestCache();
         this.writeManifestCache =
-                writeManifestCache.getBytes() == 0
-                        ? null
-                        : new SegmentsCache<>(options.pageSize(), 
writeManifestCache);
+                SegmentsCache.create(
+                        options.pageSize(), options.writeManifestCache(), 
Long.MAX_VALUE);
     }
 
     @Override
@@ -124,7 +124,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.manifestCompression(),
                 pathFactory(),
                 options.manifestTargetSize().getBytes(),
-                forWrite ? writeManifestCache : null);
+                forWrite ? writeManifestCache : readManifestCache);
     }
 
     @Override
@@ -138,12 +138,16 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.manifestFormat(),
                 options.manifestCompression(),
                 pathFactory(),
-                forWrite ? writeManifestCache : null);
+                forWrite ? writeManifestCache : readManifestCache);
     }
 
     protected IndexManifestFile.Factory indexManifestFileFactory() {
         return new IndexManifestFile.Factory(
-                fileIO, options.manifestFormat(), 
options.manifestCompression(), pathFactory());
+                fileIO,
+                options.manifestFormat(),
+                options.manifestCompression(),
+                pathFactory(),
+                readManifestCache);
     }
 
     @Override
@@ -314,4 +318,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     public ServiceManager newServiceManager() {
         return new ServiceManager(fileIO, options.path());
     }
+
+    @Override
+    public void setManifestCache(SegmentsCache<Path> manifestCache) {
+        this.readManifestCache = manifestCache;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 38b7321fc..715062565 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon;
 
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestFile;
@@ -38,6 +39,7 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -100,4 +102,6 @@ public interface FileStore<T> {
     boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
 
     List<TagCallback> createTagCallbacks();
+
+    void setManifestCache(SegmentsCache<Path> manifestCache);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index b229963f5..077775945 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -18,11 +18,15 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SegmentsCache;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
@@ -34,13 +38,20 @@ import org.checkerframework.checker.nullness.qual.NonNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
 import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
 
 /** A {@link Catalog} to cache databases and tables and manifests. */
@@ -50,16 +61,35 @@ public class CachingCatalog extends DelegateCatalog {
 
     protected final Cache<String, Map<String, String>> databaseCache;
     protected final Cache<Identifier, Table> tableCache;
+    @Nullable protected final SegmentsCache<Path> manifestCache;
 
     public CachingCatalog(Catalog wrapped) {
-        this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
+        this(
+                wrapped,
+                CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
+                CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
+                CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
     }
 
-    public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
-        this(wrapped, expirationInterval, Ticker.systemTicker());
+    public CachingCatalog(
+            Catalog wrapped,
+            Duration expirationInterval,
+            MemorySize manifestMaxMemory,
+            long manifestCacheThreshold) {
+        this(
+                wrapped,
+                expirationInterval,
+                manifestMaxMemory,
+                manifestCacheThreshold,
+                Ticker.systemTicker());
     }
 
-    public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker 
ticker) {
+    public CachingCatalog(
+            Catalog wrapped,
+            Duration expirationInterval,
+            MemorySize manifestMaxMemory,
+            long manifestCacheThreshold,
+            Ticker ticker) {
         super(wrapped);
         if (expirationInterval.isZero() || expirationInterval.isNegative()) {
             throw new IllegalArgumentException(
@@ -81,6 +111,27 @@ public class CachingCatalog extends DelegateCatalog {
                         .expireAfterAccess(expirationInterval)
                         .ticker(ticker)
                         .build();
+        this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
+    }
+
+    public static Catalog tryToCreate(Catalog catalog, Options options) {
+        if (!options.get(CACHE_ENABLED)) {
+            return catalog;
+        }
+
+        MemorySize manifestMaxMemory = 
options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
+        long manifestThreshold = 
options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
+        Optional<MemorySize> maxMemory = 
options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
+        if (maxMemory.isPresent() && 
maxMemory.get().compareTo(manifestMaxMemory) > 0) {
+            // cache all manifest files
+            manifestMaxMemory = maxMemory.get();
+            manifestThreshold = Long.MAX_VALUE;
+        }
+        return new CachingCatalog(
+                catalog,
+                options.get(CACHE_EXPIRATION_INTERVAL_MS),
+                manifestMaxMemory,
+                manifestThreshold);
     }
 
     @Override
@@ -151,7 +202,7 @@ public class CachingCatalog extends DelegateCatalog {
             Table originTable = tableCache.getIfPresent(originIdentifier);
             if (originTable == null) {
                 originTable = wrapped.getTable(originIdentifier);
-                tableCache.put(originIdentifier, originTable);
+                putTableCache(originIdentifier, originTable);
             }
             table =
                     SystemTableLoader.load(
@@ -160,15 +211,22 @@ public class CachingCatalog extends DelegateCatalog {
             if (table == null) {
                 throw new TableNotExistException(identifier);
             }
-            tableCache.put(identifier, table);
+            putTableCache(identifier, table);
             return table;
         }
 
         table = wrapped.getTable(identifier);
-        tableCache.put(identifier, table);
+        putTableCache(identifier, table);
         return table;
     }
 
+    private void putTableCache(Identifier identifier, Table table) {
+        if (manifestCache != null && table instanceof FileStoreTable) {
+            ((FileStoreTable) table).setManifestCache(manifestCache);
+        }
+        tableCache.put(identifier, table);
+    }
+
     private class TableInvalidatingRemovalListener implements 
RemovalListener<Identifier, Table> {
         @Override
         public void onRemoval(Identifier identifier, Table table, @NonNull 
RemovalCause cause) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 2eff8e902..0739014c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -24,16 +24,12 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
 import org.apache.paimon.privilege.PrivilegedCatalog;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 
-import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
-import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 
@@ -73,23 +69,9 @@ public interface CatalogFactory extends Factory {
 
     static Catalog createCatalog(CatalogContext context, ClassLoader 
classLoader) {
         Catalog catalog = createUnwrappedCatalog(context, classLoader);
-
         Options options = context.options();
-        if (options.get(CACHE_ENABLED)) {
-            catalog = new CachingCatalog(catalog, 
options.get(CACHE_EXPIRATION_INTERVAL_MS));
-        }
-
-        PrivilegeManager privilegeManager =
-                new FileBasedPrivilegeManager(
-                        catalog.warehouse(),
-                        catalog.fileIO(),
-                        context.options().get(PrivilegedCatalog.USER),
-                        context.options().get(PrivilegedCatalog.PASSWORD));
-        if (privilegeManager.privilegeEnabled()) {
-            catalog = new PrivilegedCatalog(catalog, privilegeManager);
-        }
-
-        return catalog;
+        catalog = CachingCatalog.tryToCreate(catalog, options);
+        return PrivilegedCatalog.tryToCreate(catalog, options);
     }
 
     static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader 
classLoader) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index d4c363b4c..97ab8c5d1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -67,6 +67,7 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
         super(
                 fileIO,
                 new IcebergManifestEntrySerializer(partitionType),
+                IcebergManifestEntry.schema(partitionType),
                 readerFactory,
                 writerFactory,
                 compression,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
index e247b0238..a468a662d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -39,6 +39,7 @@ public class IcebergManifestList extends 
ObjectsFile<IcebergManifestFileMeta> {
         super(
                 fileIO,
                 new IcebergManifestFileMetaSerializer(),
+                IcebergManifestFileMeta.schema(),
                 readerFactory,
                 writerFactory,
                 compression,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index eed18af17..ee6cbe769 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -197,26 +197,6 @@ public class IndexFileHandler {
         return result;
     }
 
-    public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow 
partition, int bucket) {
-        Snapshot snapshot = snapshotManager.latestSnapshot();
-        if (snapshot == null) {
-            return Collections.emptyList();
-        }
-        String indexManifest = snapshot.indexManifest();
-        if (indexManifest == null) {
-            return Collections.emptyList();
-        }
-        List<IndexManifestEntry> result = new ArrayList<>();
-        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
-            if (file.indexFile().indexType().equals(indexType)
-                    && file.partition().equals(partition)
-                    && file.bucket() == bucket) {
-                result.add(file);
-            }
-        }
-        return result;
-    }
-
     public Path filePath(IndexFileMeta file) {
         return pathFactory.toPath(file.fileName());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
index c0417b8f1..d3fafeeb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
@@ -29,7 +29,6 @@ import org.apache.paimon.types.RowType;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
@@ -80,8 +79,9 @@ public abstract class RocksDBState<K, V, CacheV> {
         this.writeOptions = new WriteOptions().setDisableWAL(true);
         this.cache =
                 Caffeine.newBuilder()
+                        .softValues()
                         .maximumSize(lruCacheSize)
-                        .executor(MoreExecutors.directExecutor())
+                        .executor(Runnable::run)
                         .build();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 91a4f171a..9989f5809 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -22,11 +22,13 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ObjectsFile;
 import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
 import javax.annotation.Nullable;
@@ -38,18 +40,21 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
 
     private IndexManifestFile(
             FileIO fileIO,
+            RowType schema,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
             String compression,
-            PathFactory pathFactory) {
+            PathFactory pathFactory,
+            @Nullable SegmentsCache<Path> cache) {
         super(
                 fileIO,
                 new IndexManifestEntrySerializer(),
+                schema,
                 readerFactory,
                 writerFactory,
                 compression,
                 pathFactory,
-                null);
+                cache);
     }
 
     /** Write new index files to index manifest. */
@@ -72,26 +77,31 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
         private final FileFormat fileFormat;
         private final String compression;
         private final FileStorePathFactory pathFactory;
+        @Nullable private final SegmentsCache<Path> cache;
 
         public Factory(
                 FileIO fileIO,
                 FileFormat fileFormat,
                 String compression,
-                FileStorePathFactory pathFactory) {
+                FileStorePathFactory pathFactory,
+                @Nullable SegmentsCache<Path> cache) {
             this.fileIO = fileIO;
             this.fileFormat = fileFormat;
             this.compression = compression;
             this.pathFactory = pathFactory;
+            this.cache = cache;
         }
 
         public IndexManifestFile create() {
             RowType schema = 
VersionedObjectSerializer.versionType(IndexManifestEntry.schema());
             return new IndexManifestFile(
                     fileIO,
+                    schema,
                     fileFormat.createReaderFactory(schema),
                     fileFormat.createWriterFactory(schema),
                     compression,
-                    pathFactory.indexManifestFileFactory());
+                    pathFactory.indexManifestFileFactory(),
+                    cache);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e2fa86789..4fd3df960 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -57,13 +57,22 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             SchemaManager schemaManager,
             RowType partitionType,
             ManifestEntrySerializer serializer,
+            RowType schema,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
             String compression,
             PathFactory pathFactory,
             long suggestedFileSize,
-            @Nullable SegmentsCache<String> cache) {
-        super(fileIO, serializer, readerFactory, writerFactory, compression, 
pathFactory, cache);
+            @Nullable SegmentsCache<Path> cache) {
+        super(
+                fileIO,
+                serializer,
+                schema,
+                readerFactory,
+                writerFactory,
+                compression,
+                pathFactory,
+                cache);
         this.schemaManager = schemaManager;
         this.partitionType = partitionType;
         this.writerFactory = writerFactory;
@@ -156,7 +165,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
         private final String compression;
         private final FileStorePathFactory pathFactory;
         private final long suggestedFileSize;
-        @Nullable private final SegmentsCache<String> cache;
+        @Nullable private final SegmentsCache<Path> cache;
 
         public Factory(
                 FileIO fileIO,
@@ -166,7 +175,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                 String compression,
                 FileStorePathFactory pathFactory,
                 long suggestedFileSize,
-                @Nullable SegmentsCache<String> cache) {
+                @Nullable SegmentsCache<Path> cache) {
             this.fileIO = fileIO;
             this.schemaManager = schemaManager;
             this.partitionType = partitionType;
@@ -184,6 +193,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
                     schemaManager,
                     partitionType,
                     new ManifestEntrySerializer(),
+                    entryType,
                     fileFormat.createReaderFactory(entryType),
                     fileFormat.createWriterFactory(entryType),
                     compression,
@@ -197,6 +207,7 @@ public class ManifestFile extends 
ObjectsFile<ManifestEntry> {
             return new ObjectsFile<>(
                     fileIO,
                     new SimpleFileEntrySerializer(),
+                    entryType,
                     fileFormat.createReaderFactory(entryType),
                     fileFormat.createWriterFactory(entryType),
                     compression,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index e33d4a643..304730f30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -22,6 +22,7 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ObjectsFile;
@@ -42,12 +43,21 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
     private ManifestList(
             FileIO fileIO,
             ManifestFileMetaSerializer serializer,
+            RowType schema,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
             String compression,
             PathFactory pathFactory,
-            @Nullable SegmentsCache<String> cache) {
-        super(fileIO, serializer, readerFactory, writerFactory, compression, 
pathFactory, cache);
+            @Nullable SegmentsCache<Path> cache) {
+        super(
+                fileIO,
+                serializer,
+                schema,
+                readerFactory,
+                writerFactory,
+                compression,
+                pathFactory,
+                cache);
     }
 
     /**
@@ -66,14 +76,14 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
         private final FileFormat fileFormat;
         private final String compression;
         private final FileStorePathFactory pathFactory;
-        @Nullable private final SegmentsCache<String> cache;
+        @Nullable private final SegmentsCache<Path> cache;
 
         public Factory(
                 FileIO fileIO,
                 FileFormat fileFormat,
                 String compression,
                 FileStorePathFactory pathFactory,
-                @Nullable SegmentsCache<String> cache) {
+                @Nullable SegmentsCache<Path> cache) {
             this.fileIO = fileIO;
             this.fileFormat = fileFormat;
             this.compression = compression;
@@ -86,6 +96,7 @@ public class ManifestList extends 
ObjectsFile<ManifestFileMeta> {
             return new ManifestList(
                     fileIO,
                     new ManifestFileMetaSerializer(),
+                    metaType,
                     fileFormat.createReaderFactory(metaType),
                     fileFormat.createWriterFactory(metaType),
                     compression,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
index f23f167f7..c33abc00c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -51,7 +52,7 @@ public class SimpleFileEntrySerializer extends 
VersionedObjectSerializer<SimpleF
             throw new IllegalArgumentException("Unsupported version: " + 
version);
         }
 
-        InternalRow file = row.getRow(4, 3);
+        InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount());
         return new SimpleFileEntry(
                 FileKind.fromByteValue(row.getByte(0)),
                 deserializeBinaryRow(row.getBinary(1)),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index b3b6fabc4..0e05d85a4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.FileIOUtils;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import javax.annotation.Nullable;
 
@@ -91,7 +90,7 @@ public class LookupFile implements Closeable {
                 .maximumWeight(maxDiskSize.getKibiBytes())
                 .weigher(LookupFile::fileWeigh)
                 .removalListener(LookupFile::removalCallback)
-                .executor(MoreExecutors.directExecutor())
+                .executor(Runnable::run)
                 .build();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index 3e4781086..bda06a08d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
@@ -49,6 +50,19 @@ public class PrivilegedCatalog extends DelegateCatalog {
         this.privilegeManager = privilegeManager;
     }
 
+    public static Catalog tryToCreate(Catalog catalog, Options options) {
+        PrivilegeManager privilegeManager =
+                new FileBasedPrivilegeManager(
+                        catalog.warehouse(),
+                        catalog.fileIO(),
+                        options.get(PrivilegedCatalog.USER),
+                        options.get(PrivilegedCatalog.PASSWORD));
+        if (privilegeManager.privilegeEnabled()) {
+            catalog = new PrivilegedCatalog(catalog, privilegeManager);
+        }
+        return catalog;
+    }
+
     public PrivilegeManager privilegeManager() {
         return privilegeManager;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 1870813e9..768e6259e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon.privilege;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestFile;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -51,8 +53,6 @@ import java.util.List;
 /** {@link FileStore} with privilege checks. */
 public class PrivilegedFileStore<T> implements FileStore<T> {
 
-    private static final long serialVersionUID = 1L;
-
     private final FileStore<T> wrapped;
     private final PrivilegeChecker privilegeChecker;
     private final Identifier identifier;
@@ -199,4 +199,9 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     public List<TagCallback> createTagCallbacks() {
         return wrapped.createTagCallbacks();
     }
+
+    @Override
+    public void setManifestCache(SegmentsCache<Path> manifestCache) {
+        wrapped.setManifestCache(manifestCache);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index bad718a04..d30bd11ef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -58,6 +58,7 @@ import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanne
 import 
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -108,6 +109,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         this.catalogEnvironment = catalogEnvironment;
     }
 
+    @Override
+    public void setManifestCache(SegmentsCache<Path> manifestCache) {
+        store().setManifestCache(manifestCache);
+    }
+
     @Override
     public OptionalLong latestSnapshotId() {
         Long snapshot = store().snapshotManager().latestSnapshotId();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 6a8e94240..58d1caaac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -97,6 +98,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         return wrapped.fileIO();
     }
 
+    @Override
+    public void setManifestCache(SegmentsCache<Path> manifestCache) {
+        wrapped.setManifestCache(manifestCache);
+    }
+
     @Override
     public TableSchema schema() {
         return wrapped.schema();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 68f053178..61fe816ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
@@ -29,6 +30,7 @@ import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SegmentsCache;
 
 import java.util.List;
 import java.util.Map;
@@ -40,6 +42,8 @@ import java.util.Optional;
  */
 public interface FileStoreTable extends DataTable {
 
+    void setManifestCache(SegmentsCache<Path> manifestCache);
+
     @Override
     default RowType rowType() {
         return schema().logicalRowType();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index f80037911..8c490e008 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.SimpleCollectingOutputView;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentSource;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -34,25 +35,30 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.BiFunction;
+
+import static org.apache.paimon.utils.ObjectsFile.readFromIterator;
 
 /** Cache records to {@link SegmentsCache} by compacted serializer. */
 @ThreadSafe
 public class ObjectsCache<K, V> {
 
     private final SegmentsCache<K> cache;
-    private final ObjectSerializer<V> serializer;
-    private final ThreadLocal<InternalRowSerializer> threadLocalRowSerializer;
-    private final BiFunction<K, Long, CloseableIterator<InternalRow>> reader;
+    private final ObjectSerializer<V> projectedSerializer;
+    private final ThreadLocal<InternalRowSerializer> formatSerializer;
+    private final FunctionWithIOException<K, Long> fileSizeFunction;
+    private final BiFunctionWithIOE<K, Long, CloseableIterator<InternalRow>> 
reader;
 
     public ObjectsCache(
             SegmentsCache<K> cache,
-            ObjectSerializer<V> serializer,
-            BiFunction<K, Long, CloseableIterator<InternalRow>> reader) {
+            ObjectSerializer<V> projectedSerializer,
+            RowType formatSchema,
+            FunctionWithIOException<K, Long> fileSizeFunction,
+            BiFunctionWithIOE<K, Long, CloseableIterator<InternalRow>> reader) 
{
         this.cache = cache;
-        this.serializer = serializer;
-        this.threadLocalRowSerializer =
-                ThreadLocal.withInitial(() -> new 
InternalRowSerializer(serializer.fieldTypes()));
+        this.projectedSerializer = projectedSerializer;
+        this.formatSerializer =
+                ThreadLocal.withInitial(() -> new 
InternalRowSerializer(formatSchema));
+        this.fileSizeFunction = fileSizeFunction;
         this.reader = reader;
     }
 
@@ -62,19 +68,37 @@ public class ObjectsCache<K, V> {
             Filter<InternalRow> loadFilter,
             Filter<InternalRow> readFilter)
             throws IOException {
-        InternalRowSerializer rowSerializer = threadLocalRowSerializer.get();
-        Segments segments =
-                cache.getSegments(key, k -> readSegments(k, fileSize, 
loadFilter, rowSerializer));
+        Segments segments = cache.getIfPresents(key);
+        if (segments != null) {
+            return readFromSegments(segments, readFilter);
+        } else {
+            if (fileSize == null) {
+                fileSize = fileSizeFunction.apply(key);
+            }
+            if (fileSize <= cache.maxElementSize()) {
+                segments = readSegments(key, fileSize, loadFilter);
+                cache.put(key, segments);
+                return readFromSegments(segments, readFilter);
+            } else {
+                return readFromIterator(
+                        reader.apply(key, fileSize), projectedSerializer, 
readFilter);
+            }
+        }
+    }
+
+    private List<V> readFromSegments(Segments segments, Filter<InternalRow> 
readFilter)
+            throws IOException {
+        InternalRowSerializer formatSerializer = this.formatSerializer.get();
         List<V> entries = new ArrayList<>();
         RandomAccessInputView view =
                 new RandomAccessInputView(
                         segments.segments(), cache.pageSize(), 
segments.limitInLastSegment());
-        BinaryRow binaryRow = new BinaryRow(rowSerializer.getArity());
+        BinaryRow binaryRow = new BinaryRow(formatSerializer.getArity());
         while (true) {
             try {
-                rowSerializer.mapFromPages(binaryRow, view);
+                formatSerializer.mapFromPages(binaryRow, view);
                 if (readFilter.test(binaryRow)) {
-                    entries.add(serializer.fromRow(binaryRow));
+                    entries.add(projectedSerializer.fromRow(binaryRow));
                 }
             } catch (EOFException e) {
                 return entries;
@@ -82,11 +106,8 @@ public class ObjectsCache<K, V> {
         }
     }
 
-    private Segments readSegments(
-            K key,
-            @Nullable Long fileSize,
-            Filter<InternalRow> loadFilter,
-            InternalRowSerializer rowSerializer) {
+    private Segments readSegments(K key, @Nullable Long fileSize, 
Filter<InternalRow> loadFilter) {
+        InternalRowSerializer formatSerializer = this.formatSerializer.get();
         try (CloseableIterator<InternalRow> iterator = reader.apply(key, 
fileSize)) {
             ArrayList<MemorySegment> segments = new ArrayList<>();
             MemorySegmentSource segmentSource =
@@ -96,7 +117,7 @@ public class ObjectsCache<K, V> {
             while (iterator.hasNext()) {
                 InternalRow row = iterator.next();
                 if (loadFilter.test(row)) {
-                    rowSerializer.serializeToPages(row, output);
+                    formatSerializer.serializeToPages(row, output);
                 }
             }
             return new Segments(segments, 
output.getCurrentPositionInSegment());
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index a457a868a..41f18f448 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -25,7 +25,7 @@ import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
@@ -36,7 +36,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import static org.apache.paimon.utils.FileUtils.createFormatReader;
+import static org.apache.paimon.utils.FileUtils.checkExists;
 
 /** A file which contains several {@link T}s, provides read and write. */
 public class ObjectsFile<T> {
@@ -48,16 +48,17 @@ public class ObjectsFile<T> {
     protected final String compression;
     protected final PathFactory pathFactory;
 
-    @Nullable private final ObjectsCache<String, T> cache;
+    @Nullable private final ObjectsCache<Path, T> cache;
 
     public ObjectsFile(
             FileIO fileIO,
             ObjectSerializer<T> serializer,
+            RowType formatType,
             FormatReaderFactory readerFactory,
             FormatWriterFactory writerFactory,
             String compression,
             PathFactory pathFactory,
-            @Nullable SegmentsCache<String> cache) {
+            @Nullable SegmentsCache<Path> cache) {
         this.fileIO = fileIO;
         this.serializer = serializer;
         this.readerFactory = readerFactory;
@@ -65,7 +66,14 @@ public class ObjectsFile<T> {
         this.compression = compression;
         this.pathFactory = pathFactory;
         this.cache =
-                cache == null ? null : new ObjectsCache<>(cache, serializer, 
this::createIterator);
+                cache == null
+                        ? null
+                        : new ObjectsCache<>(
+                                cache,
+                                serializer,
+                                formatType,
+                                this::fileSize,
+                                this::createIterator);
     }
 
     public FileIO fileIO() {
@@ -123,18 +131,12 @@ public class ObjectsFile<T> {
             Filter<InternalRow> loadFilter,
             Filter<InternalRow> readFilter)
             throws IOException {
+        Path path = pathFactory.toPath(fileName);
         if (cache != null) {
-            return cache.read(fileName, fileSize, loadFilter, readFilter);
+            return cache.read(path, fileSize, loadFilter, readFilter);
         }
 
-        RecordReader<InternalRow> reader =
-                createFormatReader(fileIO, readerFactory, 
pathFactory.toPath(fileName), fileSize);
-        if (readFilter != Filter.ALWAYS_TRUE) {
-            reader = reader.filter(readFilter);
-        }
-        List<T> result = new ArrayList<>();
-        reader.forEachRemaining(row -> result.add(serializer.fromRow(row)));
-        return result;
+        return readFromIterator(createIterator(path, fileSize), serializer, 
readFilter);
     }
 
     public String writeWithoutRolling(Collection<T> records) {
@@ -163,17 +165,40 @@ public class ObjectsFile<T> {
         }
     }
 
-    private CloseableIterator<InternalRow> createIterator(
-            String fileName, @Nullable Long fileSize) {
+    private CloseableIterator<InternalRow> createIterator(Path file, @Nullable 
Long fileSize)
+            throws IOException {
+        return FileUtils.createFormatReader(fileIO, readerFactory, file, 
fileSize)
+                .toCloseableIterator();
+    }
+
+    private long fileSize(Path file) throws IOException {
         try {
-            return createFormatReader(fileIO, readerFactory, 
pathFactory.toPath(fileName), fileSize)
-                    .toCloseableIterator();
+            return fileIO.getFileSize(file);
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            checkExists(fileIO, file);
+            throw e;
         }
     }
 
     public void delete(String fileName) {
         fileIO.deleteQuietly(pathFactory.toPath(fileName));
     }
+
+    public static <V> List<V> readFromIterator(
+            CloseableIterator<InternalRow> inputIterator,
+            ObjectSerializer<V> serializer,
+            Filter<InternalRow> readFilter) {
+        try (CloseableIterator<InternalRow> iterator = inputIterator) {
+            List<V> result = new ArrayList<>();
+            while (iterator.hasNext()) {
+                InternalRow row = iterator.next();
+                if (readFilter.test(row)) {
+                    result.add(serializer.fromRow(row));
+                }
+            }
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
index 87d5d8586..d5c2178c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
@@ -23,9 +23,10 @@ import org.apache.paimon.options.MemorySize;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
-import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import static org.apache.paimon.CoreOptions.PAGE_SIZE;
 
 /** Cache {@link Segments}. */
 public class SegmentsCache<T> {
@@ -34,26 +35,59 @@ public class SegmentsCache<T> {
 
     private final int pageSize;
     private final Cache<T, Segments> cache;
+    private final MemorySize maxMemorySize;
+    private final long maxElementSize;
 
-    public SegmentsCache(int pageSize, MemorySize maxMemorySize) {
+    public SegmentsCache(int pageSize, MemorySize maxMemorySize, long 
maxElementSize) {
         this.pageSize = pageSize;
         this.cache =
                 Caffeine.newBuilder()
+                        .softValues()
                         .weigher(this::weigh)
                         .maximumWeight(maxMemorySize.getBytes())
-                        .executor(MoreExecutors.directExecutor())
+                        .executor(Runnable::run)
                         .build();
+        this.maxMemorySize = maxMemorySize;
+        this.maxElementSize = maxElementSize;
     }
 
     public int pageSize() {
         return pageSize;
     }
 
-    public Segments getSegments(T key, Function<T, Segments> viewFunction) {
-        return cache.get(key, viewFunction);
+    public MemorySize maxMemorySize() {
+        return maxMemorySize;
+    }
+
+    public long maxElementSize() {
+        return maxElementSize;
+    }
+
+    @Nullable
+    public Segments getIfPresents(T key) {
+        return cache.getIfPresent(key);
+    }
+
+    public void put(T key, Segments segments) {
+        cache.put(key, segments);
     }
 
     private int weigh(T cacheKey, Segments segments) {
         return OBJECT_MEMORY_SIZE + segments.segments().size() * pageSize;
     }
+
+    @Nullable
+    public static <T> SegmentsCache<T> create(MemorySize maxMemorySize, long 
maxElementSize) {
+        return create((int) PAGE_SIZE.defaultValue().getBytes(), 
maxMemorySize, maxElementSize);
+    }
+
+    @Nullable
+    public static <T> SegmentsCache<T> create(
+            int pageSize, MemorySize maxMemorySize, long maxElementSize) {
+        if (maxMemorySize.getBytes() == 0) {
+            return null;
+        }
+
+        return new SegmentsCache<>(pageSize, maxMemorySize, maxElementSize);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 56fc238a8..d1f7eeb8a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -18,9 +18,18 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.FakeTicker;
@@ -31,6 +40,7 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.FileNotFoundException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,6 +51,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.paimon.data.BinaryString.fromString;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -285,4 +299,71 @@ class CachingCatalogTest extends CatalogTestBase {
                 .map(type -> Identifier.fromString(tableIdent.getFullName() + 
"$" + type))
                 .toArray(Identifier[]::new);
     }
+
+    @Test
+    public void testManifestCache() throws Exception {
+        innerTestManifestCache(Long.MAX_VALUE);
+        assertThatThrownBy(() -> innerTestManifestCache(10))
+                .hasRootCauseInstanceOf(FileNotFoundException.class);
+    }
+
+    private void innerTestManifestCache(long manifestCacheThreshold) throws 
Exception {
+        Catalog catalog =
+                new CachingCatalog(
+                        this.catalog,
+                        Duration.ofSeconds(10),
+                        MemorySize.ofMebiBytes(1),
+                        manifestCacheThreshold);
+        Identifier tableIdent = new Identifier("db", "tbl");
+        catalog.dropTable(tableIdent, true);
+        catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+
+        // write
+        Table table = catalog.getTable(tableIdent);
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, fromString("1"), fromString("1")));
+            write.write(GenericRow.of(2, fromString("2"), fromString("2")));
+            commit.commit(write.prepareCommit());
+        }
+
+        // repeat read
+        for (int i = 0; i < 5; i++) {
+            table = catalog.getTable(tableIdent);
+            ReadBuilder readBuilder = table.newReadBuilder();
+            TableScan scan = readBuilder.newScan();
+            TableRead read = readBuilder.newRead();
+            read.createReader(scan.plan()).forEachRemaining(r -> {});
+
+            // delete manifest to validate cache
+            if (i == 0) {
+                Path manifestPath = new Path(table.options().get("path"), 
"manifest");
+                assertThat(fileIO.exists(manifestPath)).isTrue();
+                fileIO.deleteDirectoryQuietly(manifestPath);
+            }
+        }
+    }
+
+    @Test
+    public void testManifestCacheOptions() {
+        Options options = new Options();
+
+        CachingCatalog caching = (CachingCatalog) 
CachingCatalog.tryToCreate(catalog, options);
+        assertThat(caching.manifestCache.maxMemorySize())
+                .isEqualTo(CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue());
+        assertThat(caching.manifestCache.maxElementSize())
+                
.isEqualTo(CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
+
+        options.set(CACHE_MANIFEST_SMALL_FILE_MEMORY, 
MemorySize.ofMebiBytes(100));
+        options.set(CACHE_MANIFEST_SMALL_FILE_THRESHOLD, 
MemorySize.ofBytes(100));
+        caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog, 
options);
+        
assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(100));
+        assertThat(caching.manifestCache.maxElementSize()).isEqualTo(100);
+
+        options.set(CACHE_MANIFEST_MAX_MEMORY, MemorySize.ofMebiBytes(256));
+        caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog, 
options);
+        
assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(256));
+        
assertThat(caching.manifestCache.maxElementSize()).isEqualTo(Long.MAX_VALUE);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index c393d3aff..159f5edae 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.table.Table;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -35,7 +36,7 @@ public class TestableCachingCatalog extends CachingCatalog {
     private final Duration cacheExpirationInterval;
 
     public TestableCachingCatalog(Catalog catalog, Duration 
expirationInterval, Ticker ticker) {
-        super(catalog, expirationInterval, ticker);
+        super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, 
ticker);
         this.cacheExpirationInterval = expirationInterval;
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index 58b259823..78d9ba17c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -47,7 +47,8 @@ public class IndexManifestFileHandlerTest {
                                 fileStore.fileIO(),
                                 fileStore.options().manifestFormat(),
                                 "zstd",
-                                fileStore.pathFactory())
+                                fileStore.pathFactory(),
+                                null)
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
                 new IndexManifestFileHandler(indexManifestFile, 
BucketMode.BUCKET_UNAWARE);
@@ -81,7 +82,8 @@ public class IndexManifestFileHandlerTest {
                                 fileStore.fileIO(),
                                 fileStore.options().manifestFormat(),
                                 "zstd",
-                                fileStore.pathFactory())
+                                fileStore.pathFactory(),
+                                null)
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
                 new IndexManifestFileHandler(indexManifestFile, 
BucketMode.HASH_FIXED);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index be9d2a48c..8a4f0b061 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -44,8 +44,10 @@ public class ObjectsCacheTest {
         Map<String, List<String>> map = new HashMap<>();
         ObjectsCache<String, String> cache =
                 new ObjectsCache<>(
-                        new SegmentsCache<>(1024, MemorySize.ofKibiBytes(5)),
+                        new SegmentsCache<>(1024, MemorySize.ofKibiBytes(5), 
Long.MAX_VALUE),
                         new StringSerializer(),
+                        RowType.of(DataTypes.STRING()),
+                        k -> 1L,
                         (k, size) ->
                                 CloseableIterator.adapterForIterator(
                                         map.get(k).stream()

Reply via email to