This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ce38b2ccd7 [java][python] Add block-level local disk cache for file 
reads (#7699)
ce38b2ccd7 is described below

commit ce38b2ccd7cbaabcfc1d5e90f68586ebecc66265
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 14 21:30:19 2026 +0800

    [java][python] Add block-level local disk cache for file reads (#7699)
    
    Introduce a CachingFileIO wrapper that transparently caches remote file
    reads at block granularity on local disk. Files are classified by
    FileType and only META, GLOBAL_INDEX, BUCKET_INDEX types are cached;
    DATA and FILE_INDEX are read directly.
---
 docs/content/program-api/file-cache.md             | 130 ++++++
 docs/content/pypaimon/global-index.md              | 136 ++++++
 .../generated/catalog_configuration.html           |  30 ++
 .../org/apache/paimon/options/CatalogOptions.java  |  38 ++
 .../org/apache/paimon/fs/cache/CachingFileIO.java  | 173 ++++++++
 .../fs/cache/CachingSeekableInputStream.java       | 160 +++++++
 .../apache/paimon/fs/cache/LocalCacheManager.java  |  37 ++
 .../paimon/fs/cache/LocalDiskCacheManager.java     | 237 +++++++++++
 .../paimon/fs/cache/LocalMemoryCacheManager.java   | 118 ++++++
 .../java/org/apache/paimon/utils/FileType.java     |  48 +++
 .../apache/paimon/fs/cache/CachingFileIOTest.java  | 469 +++++++++++++++++++++
 .../paimon/fs/cache/LocalDiskCacheManagerTest.java | 171 ++++++++
 .../org/apache/paimon/catalog/AbstractCatalog.java |   7 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  13 +-
 .../pypaimon/catalog/filesystem_catalog.py         |   8 +-
 .../pypaimon/catalog/rest/rest_catalog.py          |   6 +-
 .../pypaimon/common/options/core_options.py        |  59 +++
 .../pypaimon/filesystem/caching_file_io.py         | 389 +++++++++++++++++
 .../pypaimon/tests/caching_file_io_test.py         | 421 ++++++++++++++++++
 paimon-python/pypaimon/tests/file_type_test.py     | 109 +++++
 paimon-python/pypaimon/utils/file_type.py          | 120 ++++++
 21 files changed, 2873 insertions(+), 6 deletions(-)

diff --git a/docs/content/program-api/file-cache.md 
b/docs/content/program-api/file-cache.md
new file mode 100644
index 0000000000..cfad2c00c7
--- /dev/null
+++ b/docs/content/program-api/file-cache.md
@@ -0,0 +1,130 @@
+---
+title: "Local Cache"
+weight: 8
+type: docs
+aliases:
+- /program-api/file-cache.html
+- /pypaimon/file-cache.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Local Cache
+
+When reading files from remote storage (S3, OSS, HDFS, etc.), each seek+read 
goes over the network. Paimon provides a block-level local cache that 
transparently caches file reads, significantly reducing remote I/O for repeated 
access patterns.
+
+The cache supports two modes:
+- **Disk cache**: when `local-cache.dir` is configured, blocks are cached on 
local disk.
+- **Memory cache**: when `local-cache.dir` is not configured, blocks are 
cached in memory.
+
+## Cached File Types
+
+The cache classifies files by type. By default, only `meta` and `global-index` 
types are cached. You can customize this via the `local-cache.whitelist` option.
+
+| File Type | Config Name | Examples | Default Cached |
+|-----------|-------------|----------|----------------|
+| META | meta | snapshot, schema, manifest, statistics, tag | Yes |
+| GLOBAL_INDEX | global-index | BTree, Lumina, Tantivy index files | Yes |
+| BUCKET_INDEX | bucket-index | Hash, deletion vector index files | No |
+| DATA | data | Data files (ORC, Parquet, etc.) | No |
+| FILE_INDEX | file-index | Data-file level bloom filter, bitmap | No |
+
+All file types can be added to the whitelist. The default whitelist is 
`meta,global-index`.
+
+## Enable Cache
+
+This is a catalog-level option. Configure it when creating the catalog:
+
+{{< tabs "enable-cache" >}}
+
+{{< tab "Java" >}}
+
+```java
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+
+Options options = new Options();
+options.set("warehouse", "s3://my-bucket/warehouse");
+options.set("local-cache.enabled", "true");
+// optional: use disk cache by specifying a directory
+options.set("local-cache.dir", "/tmp/paimon-cache");
+// optional: customize limits
+options.set("local-cache.max-size", "2gb");
+options.set("local-cache.block-size", "1mb");
+
+CatalogContext context = CatalogContext.create(options);
+Catalog catalog = CatalogFactory.createCatalog(context);
+
+// All tables from this catalog will use the cache
+Table table = catalog.getTable(Identifier.create("my_db", "my_table"));
+```
+
+{{< /tab >}}
+
+{{< tab "Python" >}}
+
+```python
+import pypaimon
+
+options = {
+    "warehouse": "s3://my-bucket/warehouse",
+    "local-cache.enabled": "true",
+    # optional: use disk cache by specifying a directory
+    "local-cache.dir": "/tmp/paimon-cache",
+    # optional: customize limits
+    "local-cache.max-size": "2gb",
+    "local-cache.block-size": "1mb",
+}
+
+catalog = pypaimon.create_catalog(options)
+
+# All tables from this catalog will use the cache
+table = catalog.get_table("db.my_table")
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Cache Options
+
+| Option | Type | Default | Description |
+|--------|------|---------|-------------|
+| `local-cache.enabled` | Boolean | false | Whether to enable local block 
cache for file reads. |
+| `local-cache.dir` | String | (none) | Directory for storing cached blocks on 
disk. If not configured, memory cache is used. |
+| `local-cache.max-size` | MemorySize | unlimited | Maximum total size of the 
cache. When exceeded, the least recently used blocks are evicted. |
+| `local-cache.block-size` | MemorySize | 1 mb | Block size for caching. Files 
are logically divided into fixed-size blocks and cached independently. |
+| `local-cache.whitelist` | String | meta,global-index | Comma-separated list 
of file types to cache. Supported values: `meta`, `global-index`, 
`bucket-index`, `data`, `file-index`. |
+
+## How It Works
+
+- Files are logically divided into fixed-size blocks (default 1 MB).
+- On the first read, blocks are downloaded from remote storage and cached 
locally (on disk or in memory).
+- Subsequent reads of the same block are served from the local cache, skipping 
remote I/O.
+- When using disk cache, cache files are keyed by remote file path and block 
offset, so they persist across process restarts and can be reused.
+- When the cache exceeds `max-size`, the least recently used blocks are 
evicted automatically.
+
+## Cache Lifecycle
+
+The cache is created and managed by the Catalog. All tables obtained from the 
same catalog share a single cache instance. The cache lives as long as the 
Catalog object is reachable — no explicit close is needed.
+
+In distributed computing frameworks (Flink, Spark), the `FileIO` is serialized 
and shipped to task managers. After deserialization, the cache is **not** 
recreated — reads fall through directly to the remote storage. This is by 
design: the cache lifecycle is bound to the Catalog that created it, and a 
deserialized `FileIO` is no longer managed by any Catalog.
+
+If you need caching on task managers, create a new Catalog with cache options 
enabled on each worker node.
diff --git a/docs/content/pypaimon/global-index.md 
b/docs/content/pypaimon/global-index.md
new file mode 100644
index 0000000000..f5e4d602e3
--- /dev/null
+++ b/docs/content/pypaimon/global-index.md
@@ -0,0 +1,136 @@
+---
+title: "Global Index"
+weight: 6
+type: docs
+aliases:
+- /pypaimon/global-index.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Global Index
+
+PyPaimon supports querying global indexes built on Data Evolution (append) 
tables. Three index types are available:
+
+- **BTree Index**: B-tree based index for scalar column lookups. Supports 
equality, IN, range, and combined predicates.
+- **Vector Index (Lumina)**: Approximate nearest neighbor (ANN) index for 
vector similarity search.
+- **Full-Text Index (Tantivy)**: Full-text search index for text retrieval 
with relevance scoring.
+
+> Global indexes must be built beforehand (e.g., via Spark or Flink). See 
[Global Index]({{< ref "append-table/global-index" >}}) for how to create 
indexes.
+
+## BTree Index
+
+BTree index is automatically used during scan when a filter predicate matches 
the indexed column. No special API is needed — just set a filter on the read 
builder.
+
+```python
+import pypaimon
+
+catalog = pypaimon.create_catalog(...)
+table = catalog.get_table("db.my_table")
+
+# BTree index is used automatically when filtering on indexed columns
+read_builder = table.new_read_builder()
+read_builder = read_builder.with_filter(
+    pypaimon.PredicateBuilder(table.fields)
+    .in_("name", ["a200", "a300"])
+)
+
+scan = read_builder.new_scan()
+read = read_builder.new_read()
+splits = scan.plan().splits
+data = read.to_arrow(splits)
+```
+
+Supported predicates: `equal`, `not_equal`, `less_than`, `less_or_equal`, 
`greater_than`, `greater_or_equal`, `in_`, `not_in`, `between`, `is_null`, 
`is_not_null`.
+
+## Vector Index (Lumina)
+
+Use `VectorSearchBuilder` to perform approximate nearest neighbor search on a 
vector column, then read the matched rows.
+
+```python
+table = catalog.get_table("db.my_table")
+
+# Step 1: vector search to get matching row IDs
+builder = table.new_vector_search_builder()
+index_result = (
+    builder
+    .with_vector_column("embedding")
+    .with_query_vector([1.0, 2.0, 3.0, ...])
+    .with_limit(10)
+    .execute_local()
+)
+
+# Step 2: read actual data for matched rows
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan()
+scan.with_global_index_result(index_result)
+read = read_builder.new_read()
+data = read.to_arrow(scan.plan().splits)
+```
+
+You can also add a scalar filter to pre-filter rows before vector search:
+
+```python
+predicate = (
+    pypaimon.PredicateBuilder(table.fields)
+    .equal("category", "electronics")
+)
+
+index_result = (
+    table.new_vector_search_builder()
+    .with_vector_column("embedding")
+    .with_query_vector([1.0, 2.0, 3.0, ...])
+    .with_limit(10)
+    .with_filter(predicate)
+    .execute_local()
+)
+
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan()
+scan.with_global_index_result(index_result)
+read = read_builder.new_read()
+data = read.to_arrow(scan.plan().splits)
+```
+
+## Full-Text Index (Tantivy)
+
+Use `FullTextSearchBuilder` to perform full-text search on a text column, then 
read the matched rows.
+
+```python
+table = catalog.get_table("db.my_table")
+
+# Step 1: full-text search to get matching row IDs
+builder = table.new_full_text_search_builder()
+index_result = (
+    builder
+    .with_text_column("content")
+    .with_query_text("search keywords")
+    .with_limit(20)
+    .execute_local()
+)
+
+# Step 2: read actual data for matched rows
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan()
+scan.with_global_index_result(index_result)
+read = read_builder.new_read()
+data = read.to_arrow(scan.plan().splits)
+```
+
+For better performance when reading from remote storage, consider enabling the 
[Local Cache]({{< ref "program-api/file-cache" >}}).
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index ae0f476c1e..6cafa6ccdf 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -146,6 +146,36 @@ under the License.
             <td>Boolean</td>
             <td>Sync all table properties to the catalog metastore (e.g. Hive 
metastore, JDBC catalog store)</td>
         </tr>
+        <tr>
+            <td><h5>local-cache.block-size</h5></td>
+            <td style="word-wrap: break-word;">1 mb</td>
+            <td>MemorySize</td>
+            <td>Block size for local cache.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.dir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Directory for local block cache on disk. If not configured, 
memory cache is used instead.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable local block cache for file reads. If 
local-cache.dir is configured, disk cache is used; otherwise memory cache is 
used.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.max-size</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Maximum total size of the local block cache. Unlimited by 
default.</td>
+        </tr>
+        <tr>
+            <td><h5>local-cache.whitelist</h5></td>
+            <td style="word-wrap: break-word;">"meta,global-index"</td>
+            <td>String</td>
+            <td>Comma-separated list of file types to cache. Supported values: 
meta, global-index, bucket-index, data, file-index.</td>
+        </tr>
         <tr>
             <td><h5>table.type</h5></td>
             <td style="word-wrap: break-word;">managed</td>
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index b6f31dd550..f900603897 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -23,6 +23,7 @@ import org.apache.paimon.table.CatalogTableType;
 import java.time.Duration;
 
 import static org.apache.paimon.options.ConfigOptions.key;
+import static org.apache.paimon.options.MemorySize.ofMebiBytes;
 
 /** Options for catalog. */
 public class CatalogOptions {
@@ -185,4 +186,41 @@ public class CatalogOptions {
                             "Whether to allow static cache in file io 
implementation. If not allowed, this means that "
                                     + "there may be a large number of FileIO 
instances generated, enabling caching can "
                                     + "lead to resource leakage.");
+
+    public static final ConfigOption<Boolean> LOCAL_CACHE_ENABLED =
+            key("local-cache.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable local block cache for file 
reads. "
+                                    + "If local-cache.dir is configured, disk 
cache is used; otherwise memory cache is used.");
+
+    public static final ConfigOption<String> LOCAL_CACHE_DIR =
+            key("local-cache.dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Directory for local block cache on disk. "
+                                    + "If not configured, memory cache is used 
instead.");
+
+    public static final ConfigOption<MemorySize> LOCAL_CACHE_MAX_SIZE =
+            key("local-cache.max-size")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Maximum total size of the local block cache. 
Unlimited by default.");
+
+    public static final ConfigOption<MemorySize> LOCAL_CACHE_BLOCK_SIZE =
+            key("local-cache.block-size")
+                    .memoryType()
+                    .defaultValue(ofMebiBytes(1))
+                    .withDescription("Block size for local cache.");
+
+    public static final ConfigOption<String> LOCAL_CACHE_WHITELIST =
+            key("local-cache.whitelist")
+                    .stringType()
+                    .defaultValue("meta,global-index")
+                    .withDescription(
+                            "Comma-separated list of file types to cache. "
+                                    + "Supported values: meta, global-index, 
bucket-index, data, file-index.");
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java
new file mode 100644
index 0000000000..e41d0f04cd
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.FileType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link FileIO} wrapper that caches reads at block granularity.
+ *
+ * <p>Only file types in the whitelist are cached. Others are read directly 
from the delegate.
+ *
+ * <p>After deserialization, the cache is null and reads fall through to the 
delegate directly.
+ */
+public class CachingFileIO implements FileIO {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileIO delegate;
+    private final Set<FileType> whitelist;
+
+    private transient volatile LocalCacheManager cache;
+
+    public CachingFileIO(FileIO delegate, LocalCacheManager cache, 
Set<FileType> whitelist) {
+        this.delegate = delegate;
+        this.cache = cache;
+        this.whitelist = EnumSet.copyOf(whitelist);
+    }
+
+    /**
+     * Wraps the given {@link FileIO} with caching if local cache is enabled 
in the catalog context.
+     *
+     * @param fileIO the FileIO to potentially wrap
+     * @param context the catalog context containing cache configuration
+     * @param cache the cache manager instance (managed by the Catalog)
+     * @return a CachingFileIO if caching is enabled and configured, otherwise 
the original FileIO
+     */
+    public static FileIO wrapWithCachingIfNeeded(
+            FileIO fileIO, CatalogContext context, @Nullable LocalCacheManager 
cache) {
+        if (fileIO instanceof CachingFileIO) {
+            return fileIO;
+        }
+        if (cache == null) {
+            return fileIO;
+        }
+        Options options = context.options();
+        Set<FileType> whitelist =
+                
FileType.parseWhitelist(options.get(CatalogOptions.LOCAL_CACHE_WHITELIST));
+        if (whitelist.isEmpty()) {
+            return fileIO;
+        }
+        return new CachingFileIO(fileIO, cache, whitelist);
+    }
+
+    /**
+     * Creates a {@link LocalCacheManager} from the catalog context options, 
or returns null if
+     * caching is not enabled.
+     */
+    @Nullable
+    public static LocalCacheManager createCacheManager(CatalogContext context) 
{
+        Options options = context.options();
+        if (!options.get(CatalogOptions.LOCAL_CACHE_ENABLED)) {
+            return null;
+        }
+
+        MemorySize maxSizeOpt = 
options.get(CatalogOptions.LOCAL_CACHE_MAX_SIZE);
+        long maxSize = maxSizeOpt == null ? Long.MAX_VALUE : 
maxSizeOpt.getBytes();
+        int blockSize = (int) 
options.get(CatalogOptions.LOCAL_CACHE_BLOCK_SIZE).getBytes();
+
+        String cacheDir = options.get(CatalogOptions.LOCAL_CACHE_DIR);
+        if (cacheDir != null) {
+            return new LocalDiskCacheManager(cacheDir, maxSize, blockSize);
+        } else {
+            return new LocalMemoryCacheManager(maxSize, blockSize);
+        }
+    }
+
+    @Override
+    public SeekableInputStream newInputStream(Path path) throws IOException {
+        LocalCacheManager c = cache;
+        FileType fileType = FileType.classify(path);
+        if (c == null || !whitelist.contains(fileType) || 
FileType.isMutable(path)) {
+            return delegate.newInputStream(path);
+        }
+        return new CachingSeekableInputStream(delegate, path, c);
+    }
+
+    @Override
+    public PositionOutputStream newOutputStream(Path path, boolean overwrite) 
throws IOException {
+        return delegate.newOutputStream(path, overwrite);
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        return delegate.getFileStatus(path);
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) throws IOException {
+        return delegate.listStatus(path);
+    }
+
+    @Override
+    public boolean exists(Path path) throws IOException {
+        return delegate.exists(path);
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        return delegate.delete(path, recursive);
+    }
+
+    @Override
+    public boolean mkdirs(Path path) throws IOException {
+        return delegate.mkdirs(path);
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        return delegate.rename(src, dst);
+    }
+
+    @Override
+    public boolean isObjectStore() {
+        return delegate.isObjectStore();
+    }
+
+    @Override
+    public void configure(CatalogContext context) {
+        delegate.configure(context);
+    }
+
+    @Override
+    public void setRuntimeContext(Map<String, String> options) {
+        delegate.setRuntimeContext(options);
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.close();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java
new file mode 100644
index 0000000000..b81e98b572
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** A {@link SeekableInputStream} that caches reads at block granularity on 
local disk. */
+public class CachingSeekableInputStream extends SeekableInputStream {
+
+    private final FileIO fileIO;
+    private final Path path;
+    private final LocalCacheManager cache;
+    private long pos;
+    private long fileSize = -1;
+    @Nullable private SeekableInputStream remoteStream;
+
+    public CachingSeekableInputStream(FileIO fileIO, Path path, 
LocalCacheManager cache) {
+        this.fileIO = fileIO;
+        this.path = path;
+        this.cache = cache;
+        this.pos = 0;
+    }
+
+    private long fileSize() throws IOException {
+        if (fileSize == -1) {
+            String pathStr = path.toString();
+            long cached = cache.getFileSize(pathStr);
+            if (cached >= 0) {
+                fileSize = cached;
+            } else {
+                fileSize = fileIO.getFileStatus(path).getLen();
+                cache.putFileSize(pathStr, fileSize);
+            }
+        }
+        return fileSize;
+    }
+
+    @Override
+    public void seek(long desired) throws IOException {
+        this.pos = Math.max(0, desired);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return pos;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (pos >= fileSize()) {
+            return -1;
+        }
+        int blockSize = cache.blockSize();
+        int blockIndex = (int) (pos / blockSize);
+        byte[] blockData = readBlock(blockIndex);
+        int offsetInBlock = (int) (pos - (long) blockIndex * blockSize);
+        pos++;
+        return blockData[offsetInBlock] & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (len == 0) {
+            return 0;
+        }
+        if (pos >= fileSize()) {
+            return -1;
+        }
+
+        int blockSize = cache.blockSize();
+        long end = Math.min(pos + len, fileSize());
+        int totalRead = 0;
+
+        while (pos < end) {
+            int blockIndex = (int) (pos / blockSize);
+            byte[] blockData = readBlock(blockIndex);
+
+            long blockStart = (long) blockIndex * blockSize;
+            int startInBlock = (int) (pos - blockStart);
+            int endInBlock = (int) Math.min(end - blockStart, 
blockData.length);
+            int bytesToCopy = endInBlock - startInBlock;
+
+            System.arraycopy(blockData, startInBlock, b, off + totalRead, 
bytesToCopy);
+            totalRead += bytesToCopy;
+            pos += bytesToCopy;
+        }
+
+        return totalRead;
+    }
+
+    private byte[] readBlock(int blockIndex) throws IOException {
+        byte[] cached = cache.getBlock(path.toString(), blockIndex);
+        if (cached != null) {
+            return cached;
+        }
+
+        int blockSize = cache.blockSize();
+        long offset = (long) blockIndex * blockSize;
+        int readSize = (int) Math.min(blockSize, fileSize() - offset);
+
+        SeekableInputStream stream = getRemoteStream();
+        stream.seek(offset);
+        byte[] data = readFully(stream, readSize);
+
+        cache.putBlock(path.toString(), blockIndex, data);
+        return data;
+    }
+
+    private SeekableInputStream getRemoteStream() throws IOException {
+        if (remoteStream == null) {
+            remoteStream = fileIO.newInputStream(path);
+        }
+        return remoteStream;
+    }
+
+    private static byte[] readFully(SeekableInputStream in, int size) throws 
IOException {
+        byte[] buf = new byte[size];
+        int remaining = size;
+        int off = 0;
+        while (remaining > 0) {
+            int n = in.read(buf, off, remaining);
+            if (n < 0) {
+                break;
+            }
+            off += n;
+            remaining -= n;
+        }
+        return buf;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (remoteStream != null) {
+            remoteStream.close();
+            remoteStream = null;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalCacheManager.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalCacheManager.java
new file mode 100644
index 0000000000..4929bf5cd8
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalCacheManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import javax.annotation.Nullable;
+
+/** Block-level local cache manager interface for file reads. */
+public interface LocalCacheManager {
+
+    int blockSize();
+
+    @Nullable
+    byte[] getBlock(String filePath, int blockIndex);
+
+    void putBlock(String filePath, int blockIndex, byte[] data);
+
+    /** Returns cached file size, or -1 if not cached. */
+    long getFileSize(String filePath);
+
+    void putFileSize(String filePath, long size);
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalDiskCacheManager.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalDiskCacheManager.java
new file mode 100644
index 0000000000..6dfc710e3b
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalDiskCacheManager.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Block-level local disk cache with LRU eviction. Thread-safe. */
+public class LocalDiskCacheManager implements LocalCacheManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalDiskCacheManager.class);
+
+    private final File cacheDir;
+    private final long maxSizeBytes;
+    private final int blockSize;
+    private final Object lock = new Object();
+    private final ConcurrentHashMap<String, Long> fileSizeCache = new 
ConcurrentHashMap<>();
+
+    // LRU-ordered index: key -> size. Access order so get() moves entry to 
tail.
+    private final LinkedHashMap<String, Long> entryIndex;
+    private long currentSize;
+
+    public LocalDiskCacheManager(String cacheDir, long maxSizeBytes, int 
blockSize) {
+        this.cacheDir = new File(cacheDir);
+        this.maxSizeBytes = maxSizeBytes;
+        this.blockSize = blockSize;
+        this.entryIndex = new LinkedHashMap<>(64, 0.75f, true);
+        this.cacheDir.mkdirs();
+        this.currentSize = scanAndPopulateIndex();
+    }
+
+    public int blockSize() {
+        return blockSize;
+    }
+
+    public byte[] getBlock(String filePath, int blockIndex) {
+        File path = cachePath(filePath, blockIndex);
+        String cacheKey = path.getPath();
+        synchronized (lock) {
+            if (!entryIndex.containsKey(cacheKey)) {
+                return null;
+            }
+            // access to update LRU order
+            entryIndex.get(cacheKey);
+        }
+        try {
+            return Files.readAllBytes(path.toPath());
+        } catch (IOException e) {
+            LOG.debug("Failed to read cache block: {}", path, e);
+            synchronized (lock) {
+                Long size = entryIndex.remove(cacheKey);
+                if (size != null) {
+                    currentSize -= size;
+                }
+            }
+            return null;
+        }
+    }
+
+    public void putBlock(String filePath, int blockIndex, byte[] data) {
+        File path = cachePath(filePath, blockIndex);
+        String cacheKey = path.getPath();
+
+        synchronized (lock) {
+            if (entryIndex.containsKey(cacheKey)) {
+                return;
+            }
+        }
+
+        File subDir = path.getParentFile();
+        subDir.mkdirs();
+
+        File tmpFile =
+                new File(
+                        path.getParent(),
+                        path.getName() + ".tmp." + 
Thread.currentThread().getId());
+        try {
+            try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
+                fos.write(data);
+            }
+            if (!tmpFile.renameTo(path)) {
+                tmpFile.delete();
+                return;
+            }
+        } catch (IOException e) {
+            tmpFile.delete();
+            LOG.debug("Failed to write cache block: {}", path, e);
+            return;
+        }
+
+        boolean needEvict = false;
+        synchronized (lock) {
+            entryIndex.put(cacheKey, (long) data.length);
+            currentSize += data.length;
+            needEvict = maxSizeBytes < Long.MAX_VALUE && currentSize > 
maxSizeBytes;
+        }
+        if (needEvict) {
+            evict();
+        }
+    }
+
+    private void evict() {
+        List<Map.Entry<String, Long>> toDelete = new ArrayList<>();
+        synchronized (lock) {
+            if (currentSize <= maxSizeBytes) {
+                return;
+            }
+            Iterator<Map.Entry<String, Long>> it = 
entryIndex.entrySet().iterator();
+            while (it.hasNext() && currentSize > maxSizeBytes) {
+                Map.Entry<String, Long> entry = it.next();
+                toDelete.add(entry);
+                currentSize -= entry.getValue();
+                it.remove();
+            }
+        }
+        for (Map.Entry<String, Long> entry : toDelete) {
+            if (!new File(entry.getKey()).delete()) {
+                synchronized (lock) {
+                    entryIndex.put(entry.getKey(), entry.getValue());
+                    currentSize += entry.getValue();
+                }
+            }
+        }
+    }
+
+    private long scanAndPopulateIndex() {
+        long total = 0;
+        try {
+            Path root = cacheDir.toPath();
+            if (!Files.exists(root)) {
+                return 0;
+            }
+            Files.walkFileTree(
+                    root,
+                    new SimpleFileVisitor<Path>() {
+                        @Override
+                        public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) {
+                            String name = file.getFileName().toString();
+                            if (!name.contains(".tmp.")) {
+                                entryIndex.put(file.toFile().getPath(), 
attrs.size());
+                            }
+                            return FileVisitResult.CONTINUE;
+                        }
+                    });
+            for (Long size : entryIndex.values()) {
+                total += size;
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to scan cache directory", e);
+        }
+        return total;
+    }
+
+    private File cachePath(String filePath, int blockIndex) {
+        String key = filePath + ":" + blockIndex;
+        String hex = sha256Hex(key);
+        String prefix = hex.substring(0, 2);
+        return new File(new File(cacheDir, prefix), hex);
+    }
+
+    private static final ThreadLocal<MessageDigest> SHA256_DIGEST =
+            ThreadLocal.withInitial(
+                    () -> {
+                        try {
+                            return MessageDigest.getInstance("SHA-256");
+                        } catch (NoSuchAlgorithmException e) {
+                            throw new RuntimeException("SHA-256 not 
available", e);
+                        }
+                    });
+
+    private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray();
+
+    private static String sha256Hex(String input) {
+        MessageDigest md = SHA256_DIGEST.get();
+        md.reset();
+        byte[] hash = 
md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+        char[] chars = new char[hash.length * 2];
+        for (int i = 0; i < hash.length; i++) {
+            chars[i * 2] = HEX_CHARS[(hash[i] >> 4) & 0x0f];
+            chars[i * 2 + 1] = HEX_CHARS[hash[i] & 0x0f];
+        }
+        return new String(chars);
+    }
+
+    @VisibleForTesting
+    long currentSize() {
+        synchronized (lock) {
+            return currentSize;
+        }
+    }
+
+    @Override
+    public long getFileSize(String filePath) {
+        Long size = fileSizeCache.get(filePath);
+        return size != null ? size : -1;
+    }
+
+    @Override
+    public void putFileSize(String filePath, long size) {
+        fileSizeCache.put(filePath, size);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalMemoryCacheManager.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalMemoryCacheManager.java
new file mode 100644
index 0000000000..3e7baab21e
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalMemoryCacheManager.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Block-level in-memory cache with LRU eviction. Thread-safe. */
+public class LocalMemoryCacheManager implements LocalCacheManager {
+
+    private final long maxSizeBytes;
+    private final int blockSize;
+    private final Object lock = new Object();
+    private final LinkedHashMap<BlockKey, byte[]> cache;
+    private final ConcurrentHashMap<String, Long> fileSizeCache = new 
ConcurrentHashMap<>();
+
+    private long currentSize;
+
+    public LocalMemoryCacheManager(long maxSizeBytes, int blockSize) {
+        this.maxSizeBytes = maxSizeBytes;
+        this.blockSize = blockSize;
+        this.currentSize = 0;
+        this.cache = new LinkedHashMap<>(64, 0.75f, true);
+    }
+
+    @Override
+    public int blockSize() {
+        return blockSize;
+    }
+
+    @Nullable
+    @Override
+    public byte[] getBlock(String filePath, int blockIndex) {
+        BlockKey key = new BlockKey(filePath, blockIndex);
+        synchronized (lock) {
+            return cache.get(key);
+        }
+    }
+
+    @Override
+    public void putBlock(String filePath, int blockIndex, byte[] data) {
+        BlockKey key = new BlockKey(filePath, blockIndex);
+        synchronized (lock) {
+            if (cache.containsKey(key)) {
+                return;
+            }
+            currentSize += data.length;
+            cache.put(key, data);
+            while (maxSizeBytes < Long.MAX_VALUE
+                    && currentSize > maxSizeBytes
+                    && !cache.isEmpty()) {
+                Iterator<Map.Entry<BlockKey, byte[]>> it = 
cache.entrySet().iterator();
+                Map.Entry<BlockKey, byte[]> eldest = it.next();
+                currentSize -= eldest.getValue().length;
+                it.remove();
+            }
+        }
+    }
+
+    @Override
+    public long getFileSize(String filePath) {
+        Long size = fileSizeCache.get(filePath);
+        return size != null ? size : -1;
+    }
+
+    @Override
+    public void putFileSize(String filePath, long size) {
+        fileSizeCache.put(filePath, size);
+    }
+
+    private static class BlockKey {
+        final String filePath;
+        final int blockIndex;
+
+        BlockKey(String filePath, int blockIndex) {
+            this.filePath = filePath;
+            this.blockIndex = blockIndex;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof BlockKey)) {
+                return false;
+            }
+            BlockKey that = (BlockKey) o;
+            return blockIndex == that.blockIndex && Objects.equals(filePath, 
that.filePath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(filePath, blockIndex);
+        }
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
index 7320b495b4..df71b0248a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
@@ -20,6 +20,12 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.EnumSet;
+import java.util.Set;
+
 /**
  * Classification of Paimon files.
  *
@@ -39,6 +45,8 @@ public enum FileType {
     GLOBAL_INDEX,
     FILE_INDEX;
 
+    private static final Logger LOG = LoggerFactory.getLogger(FileType.class);
+
     // keep in sync with SnapshotManager.SNAPSHOT_PREFIX
     private static final String SNAPSHOT_PREFIX = "snapshot-";
     // keep in sync with SchemaManager.SCHEMA_PREFIX
@@ -70,6 +78,46 @@ public enum FileType {
         return this == BUCKET_INDEX || this == GLOBAL_INDEX || this == 
FILE_INDEX;
     }
 
+    /** Parse a comma-separated whitelist string into a set of {@link 
FileType}s. */
+    public static Set<FileType> parseWhitelist(String whitelist) {
+        Set<FileType> result = EnumSet.noneOf(FileType.class);
+        for (String name : whitelist.split(",")) {
+            name = name.trim();
+            switch (name) {
+                case "meta":
+                    result.add(META);
+                    break;
+                case "global-index":
+                    result.add(GLOBAL_INDEX);
+                    break;
+                case "bucket-index":
+                    result.add(BUCKET_INDEX);
+                    break;
+                case "data":
+                    result.add(DATA);
+                    break;
+                case "file-index":
+                    result.add(FILE_INDEX);
+                    break;
+                default:
+                    if (!name.isEmpty()) {
+                        LOG.warn(
+                                "Unknown local-cache.whitelist value '{}'. "
+                                        + "Supported values: meta, 
global-index, bucket-index, data, file-index.",
+                                name);
+                    }
+                    break;
+            }
+        }
+        return result;
+    }
+
+    /** Returns {@code true} if the file is mutable and should not be cached. 
*/
+    public static boolean isMutable(Path filePath) {
+        String name = filePath.getName();
+        return "EARLIEST".equals(name) || "LATEST".equals(name);
+    }
+
     /**
      * Classify a file based on its full path.
      *
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java
new file mode 100644
index 0000000000..bb179ed47b
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.FileType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CachingFileIO} and {@link CachingSeekableInputStream}. */
+class CachingFileIOTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private String cacheDir;
+
+    @BeforeEach
+    void setUp() {
+        cacheDir = tempDir.resolve("cache").toString();
+    }
+
+    private CachingFileIO newCachingFileIO(
+            FileIO delegate, LocalCacheManager cache, EnumSet<FileType> 
whitelist, int blockSize) {
+        return new CachingFileIO(delegate, cache, whitelist);
+    }
+
+    @Test
+    void testMetaFileIsCached() throws IOException {
+        byte[] data = "snapshot data".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("snapshot-1", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        // first read
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("snapshot-1"))) {
+            byte[] result = readAll(s, data.length);
+            assertThat(result).isEqualTo(data);
+        }
+
+        // second read should still work (cache hit)
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("snapshot-1"))) {
+            byte[] result = readAll(s, data.length);
+            assertThat(result).isEqualTo(data);
+        }
+
+        assertThat(delegate.getFileStatusCallCount("snapshot-1")).isEqualTo(1);
+    }
+
+    @Test
+    void testManifestFileIsCached() throws IOException {
+        byte[] data = "manifest data".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("manifest-abc", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("manifest-abc"))) {
+            assertThat(readAll(s, data.length)).isEqualTo(data);
+        }
+    }
+
+    @Test
+    void testGlobalIndexFileIsCached() throws IOException {
+        byte[] data = "index data".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("global-index-uuid.index", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        try (SeekableInputStream s =
+                cachingIO.newInputStream(new Path("global-index-uuid.index"))) 
{
+            assertThat(readAll(s, data.length)).isEqualTo(data);
+        }
+    }
+
+    @Test
+    void testDataFileNotCached() throws IOException {
+        byte[] data = "data content".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("data-abc.orc", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        SeekableInputStream s = cachingIO.newInputStream(new 
Path("data-abc.orc"));
+        assertThat(s).isNotInstanceOf(CachingSeekableInputStream.class);
+        byte[] result = readAll(s, data.length);
+        assertThat(result).isEqualTo(data);
+        s.close();
+        // getFileStatus should NOT be called for data files
+        
assertThat(delegate.getFileStatusCallCount("data-abc.orc")).isEqualTo(0);
+    }
+
+    @Test
+    void testFileIndexNotCached() throws IOException {
+        byte[] data = "file index content".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("data-abc.orc.index", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        SeekableInputStream s = cachingIO.newInputStream(new 
Path("data-abc.orc.index"));
+        assertThat(s).isNotInstanceOf(CachingSeekableInputStream.class);
+        s.close();
+    }
+
+    @Test
+    void testCustomWhitelistMetaOnly() throws IOException {
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("snapshot-1", "snap".getBytes());
+        delegate.addFile("global-index-uuid.index", "idx".getBytes());
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO = newCachingFileIO(delegate, cache, 
EnumSet.of(FileType.META), 64);
+
+        SeekableInputStream s1 = cachingIO.newInputStream(new 
Path("snapshot-1"));
+        assertThat(s1).isInstanceOf(CachingSeekableInputStream.class);
+        s1.close();
+
+        SeekableInputStream s2 = cachingIO.newInputStream(new 
Path("global-index-uuid.index"));
+        assertThat(s2).isNotInstanceOf(CachingSeekableInputStream.class);
+        s2.close();
+    }
+
+    @Test
+    void testBucketIndexCachedWhenInWhitelist() throws IOException {
+        byte[] data = "bucket index".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("index-uuid-0", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate,
+                        cache,
+                        EnumSet.of(FileType.META, FileType.GLOBAL_INDEX, 
FileType.BUCKET_INDEX),
+                        64);
+
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("index-uuid-0"))) {
+            assertThat(s).isInstanceOf(CachingSeekableInputStream.class);
+            assertThat(readAll(s, data.length)).isEqualTo(data);
+        }
+    }
+
+    @Test
+    void testCacheHitAvoidsRemoteRead() throws IOException {
+        byte[] data = "0123456789abcdef".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("snapshot-1", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 8);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        // first read populates cache
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("snapshot-1"))) {
+            readAll(s, data.length);
+        }
+        int firstReadCount = delegate.newInputStreamCallCount("snapshot-1");
+        assertThat(firstReadCount).isEqualTo(1);
+
+        // second read should hit cache — delegate.newInputStream should NOT 
be called
+        // because the remote stream is lazily opened and all blocks are cached
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("snapshot-1"))) {
+            byte[] result = readAll(s, data.length);
+            assertThat(result).isEqualTo(data);
+        }
+        
assertThat(delegate.newInputStreamCallCount("snapshot-1")).isEqualTo(firstReadCount);
+    }
+
+    @Test
+    void testMutableFilesNotCached() throws IOException {
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("EARLIEST", "1".getBytes());
+        delegate.addFile("LATEST", "42".getBytes());
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        // EARLIEST and LATEST are META but mutable — should not be cached
+        SeekableInputStream s1 = cachingIO.newInputStream(new 
Path("EARLIEST"));
+        assertThat(s1).isNotInstanceOf(CachingSeekableInputStream.class);
+        s1.close();
+
+        SeekableInputStream s2 = cachingIO.newInputStream(new Path("LATEST"));
+        assertThat(s2).isNotInstanceOf(CachingSeekableInputStream.class);
+        s2.close();
+
+        assertThat(delegate.getFileStatusCallCount("EARLIEST")).isEqualTo(0);
+        assertThat(delegate.getFileStatusCallCount("LATEST")).isEqualTo(0);
+    }
+
+    @Test
+    void testReadSpanningMultipleBlocks() throws IOException {
+        byte[] data = new byte[1024];
+        for (int i = 0; i < data.length; i++) {
+            data[i] = (byte) (i % 256);
+        }
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("snapshot-1", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 100);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("snapshot-1"))) {
+            // read across block boundary: block 0 ends at 100, block 1 starts 
at 100
+            s.seek(90);
+            byte[] result = new byte[30];
+            int read = readFully(s, result);
+            assertThat(read).isEqualTo(30);
+            byte[] expected = new byte[30];
+            System.arraycopy(data, 90, expected, 0, 30);
+            assertThat(result).isEqualTo(expected);
+        }
+    }
+
+    @Test
+    void testSeekAndRead() throws IOException {
+        byte[] data = "0123456789abcdef".getBytes();
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("snapshot-1", data);
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 8);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        try (SeekableInputStream s = cachingIO.newInputStream(new 
Path("snapshot-1"))) {
+            s.seek(10);
+            byte[] result = new byte[6];
+            readFully(s, result);
+            assertThat(new String(result)).isEqualTo("abcdef");
+        }
+    }
+
+    @Test
+    void testDelegateMethodsForwarded() throws IOException {
+        MockFileIO delegate = new MockFileIO();
+        delegate.addFile("snapshot-1", "data".getBytes());
+
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        CachingFileIO cachingIO =
+                newCachingFileIO(
+                        delegate, cache, EnumSet.of(FileType.META, 
FileType.GLOBAL_INDEX), 64);
+
+        assertThat(cachingIO.exists(new Path("snapshot-1"))).isTrue();
+        assertThat(cachingIO.exists(new Path("nonexistent"))).isFalse();
+        assertThat(cachingIO.isObjectStore()).isFalse();
+    }
+
+    private byte[] readAll(SeekableInputStream s, int size) throws IOException 
{
+        byte[] buf = new byte[size];
+        int off = 0;
+        while (off < size) {
+            int n = s.read(buf, off, size - off);
+            if (n < 0) {
+                break;
+            }
+            off += n;
+        }
+        return buf;
+    }
+
+    private int readFully(SeekableInputStream s, byte[] buf) throws 
IOException {
+        int off = 0;
+        while (off < buf.length) {
+            int n = s.read(buf, off, buf.length - off);
+            if (n < 0) {
+                break;
+            }
+            off += n;
+        }
+        return off;
+    }
+
+    /** Simple in-memory FileIO for testing. */
+    private static class MockFileIO implements FileIO {
+
+        private final Map<String, byte[]> files = new HashMap<>();
+        private final Map<String, Integer> fileStatusCalls = new HashMap<>();
+        private final Map<String, Integer> newInputStreamCalls = new 
HashMap<>();
+
+        void addFile(String name, byte[] data) {
+            files.put(name, data);
+        }
+
+        int getFileStatusCallCount(String name) {
+            return fileStatusCalls.getOrDefault(name, 0);
+        }
+
+        int newInputStreamCallCount(String name) {
+            return newInputStreamCalls.getOrDefault(name, 0);
+        }
+
+        @Override
+        public SeekableInputStream newInputStream(Path path) throws 
IOException {
+            String name = path.getName();
+            newInputStreamCalls.merge(name, 1, Integer::sum);
+            byte[] data = files.get(name);
+            if (data == null) {
+                throw new IOException("File not found: " + name);
+            }
+            return new ByteArraySeekableInputStream(data);
+        }
+
+        @Override
+        public PositionOutputStream newOutputStream(Path path, boolean 
overwrite) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public FileStatus getFileStatus(Path path) throws IOException {
+            String name = path.getName();
+            fileStatusCalls.merge(name, 1, Integer::sum);
+            byte[] data = files.get(name);
+            if (data == null) {
+                throw new IOException("File not found: " + name);
+            }
+            return new FileStatus() {
+                @Override
+                public long getLen() {
+                    return data.length;
+                }
+
+                @Override
+                public boolean isDir() {
+                    return false;
+                }
+
+                @Override
+                public Path getPath() {
+                    return path;
+                }
+
+                @Override
+                public long getModificationTime() {
+                    return 0;
+                }
+            };
+        }
+
+        @Override
+        public FileStatus[] listStatus(Path path) {
+            return new FileStatus[0];
+        }
+
+        @Override
+        public boolean exists(Path path) {
+            return files.containsKey(path.getName());
+        }
+
+        @Override
+        public boolean delete(Path path, boolean recursive) {
+            return files.remove(path.getName()) != null;
+        }
+
+        @Override
+        public boolean mkdirs(Path path) {
+            return true;
+        }
+
+        @Override
+        public boolean rename(Path src, Path dst) {
+            return false;
+        }
+
+        @Override
+        public boolean isObjectStore() {
+            return false;
+        }
+
+        @Override
+        public void configure(CatalogContext context) {}
+    }
+
+    /** SeekableInputStream backed by a byte array. */
+    private static class ByteArraySeekableInputStream extends 
SeekableInputStream {
+
+        private final byte[] data;
+        private int pos;
+
+        ByteArraySeekableInputStream(byte[] data) {
+            this.data = data;
+            this.pos = 0;
+        }
+
+        @Override
+        public void seek(long desired) {
+            this.pos = (int) Math.max(0, Math.min(desired, data.length));
+        }
+
+        @Override
+        public long getPos() {
+            return pos;
+        }
+
+        @Override
+        public int read() {
+            if (pos >= data.length) {
+                return -1;
+            }
+            return data[pos++] & 0xFF;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            if (pos >= data.length) {
+                return -1;
+            }
+            int toRead = Math.min(len, data.length - pos);
+            System.arraycopy(data, pos, b, off, toRead);
+            pos += toRead;
+            return toRead;
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/cache/LocalDiskCacheManagerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/LocalDiskCacheManagerTest.java
new file mode 100644
index 0000000000..d6fb136cf5
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/LocalDiskCacheManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LocalDiskCacheManager}. */
+class LocalDiskCacheManagerTest {
+
+    @TempDir Path tempDir;
+
+    private String cacheDir;
+
+    @BeforeEach
+    void setUp() {
+        cacheDir = tempDir.resolve("cache").toString();
+    }
+
+    @Test
+    void testPutAndGet() {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        byte[] data = "hello world block".getBytes();
+        cache.putBlock("file1.index", 0, data);
+        assertThat(cache.getBlock("file1.index", 0)).isEqualTo(data);
+    }
+
+    @Test
+    void testCacheMiss() {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        assertThat(cache.getBlock("nonexistent", 0)).isNull();
+    }
+
+    @Test
+    void testDifferentKeys() {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        cache.putBlock("file1", 0, "block0".getBytes());
+        cache.putBlock("file1", 1, "block1".getBytes());
+        cache.putBlock("file2", 0, "other0".getBytes());
+
+        assertThat(cache.getBlock("file1", 0)).isEqualTo("block0".getBytes());
+        assertThat(cache.getBlock("file1", 1)).isEqualTo("block1".getBytes());
+        assertThat(cache.getBlock("file2", 0)).isEqualTo("other0".getBytes());
+    }
+
+    @Test
+    void testDuplicatePutIsNoop() {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        cache.putBlock("file1", 0, "original".getBytes());
+        cache.putBlock("file1", 0, "duplicate".getBytes());
+        assertThat(cache.getBlock("file1", 
0)).isEqualTo("original".getBytes());
+    }
+
+    @Test
+    void testEviction() {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 100, 
64);
+        byte[] block0 = new byte[60];
+        byte[] block1 = new byte[60];
+        java.util.Arrays.fill(block0, (byte) 'a');
+        java.util.Arrays.fill(block1, (byte) 'b');
+        cache.putBlock("f", 0, block0);
+        cache.putBlock("f", 1, block1);
+        // total 120 > 100, at least one block should be evicted
+        byte[] remaining0 = cache.getBlock("f", 0);
+        byte[] remaining1 = cache.getBlock("f", 1);
+        assertThat(remaining0 == null || remaining1 == null).isTrue();
+    }
+
+    @Test
+    void testScanSizeOnRestart() {
+        LocalDiskCacheManager cache1 = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        byte[] data0 = new byte[100];
+        byte[] data1 = new byte[200];
+        java.util.Arrays.fill(data0, (byte) 'x');
+        java.util.Arrays.fill(data1, (byte) 'y');
+        cache1.putBlock("f", 0, data0);
+        cache1.putBlock("f", 1, data1);
+
+        LocalDiskCacheManager cache2 = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        assertThat(cache2.currentSize()).isEqualTo(300);
+        assertThat(cache2.getBlock("f", 0)).isEqualTo(data0);
+        assertThat(cache2.getBlock("f", 1)).isEqualTo(data1);
+    }
+
+    @Test
+    void testCacheDirCreated() {
+        String deepDir = tempDir.resolve("sub").resolve("deep").toString();
+        new LocalDiskCacheManager(deepDir, Long.MAX_VALUE, 64);
+        assertThat(new java.io.File(deepDir).isDirectory()).isTrue();
+    }
+
+    @Test
+    void testConcurrentPutGet() throws InterruptedException {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        List<Throwable> errors = new CopyOnWriteArrayList<>();
+
+        List<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            final int idx = i;
+            threads.add(
+                    new Thread(
+                            () -> {
+                                try {
+                                    byte[] data = new byte[100];
+                                    java.util.Arrays.fill(data, (byte) (idx % 
256));
+                                    cache.putBlock("concurrent", idx, data);
+                                } catch (Throwable e) {
+                                    errors.add(e);
+                                }
+                            }));
+            threads.add(
+                    new Thread(
+                            () -> {
+                                try {
+                                    byte[] result = 
cache.getBlock("concurrent", idx);
+                                    if (result != null) {
+                                        byte[] expected = new byte[100];
+                                        java.util.Arrays.fill(expected, (byte) 
(idx % 256));
+                                        assertThat(result).isEqualTo(expected);
+                                    }
+                                } catch (Throwable e) {
+                                    errors.add(e);
+                                }
+                            }));
+        }
+        for (Thread t : threads) {
+            t.start();
+        }
+        for (Thread t : threads) {
+            t.join();
+        }
+        assertThat(errors).isEmpty();
+    }
+
+    @Test
+    void testUnlimitedCacheSkipsEviction() {
+        LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 
Long.MAX_VALUE, 64);
+        for (int i = 0; i < 50; i++) {
+            byte[] data = new byte[100];
+            java.util.Arrays.fill(data, (byte) 'x');
+            cache.putBlock("f", i, data);
+        }
+        for (int i = 0; i < 50; i++) {
+            assertThat(cache.getBlock("f", i)).isNotNull();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4130c8d575..9512e2f767 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -25,6 +25,8 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.cache.CachingFileIO;
+import org.apache.paimon.fs.cache.LocalCacheManager;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.options.Options;
@@ -83,15 +85,18 @@ public abstract class AbstractCatalog implements Catalog {
     protected final FileIO fileIO;
     protected final Map<String, String> tableDefaultOptions;
     protected final CatalogContext context;
+    protected final @Nullable LocalCacheManager cacheManager;
 
     protected AbstractCatalog(FileIO fileIO) {
         this.fileIO = fileIO;
         this.tableDefaultOptions = new HashMap<>();
         this.context = CatalogContext.create(new Options());
+        this.cacheManager = null;
     }
 
     protected AbstractCatalog(FileIO fileIO, CatalogContext context) {
-        this.fileIO = fileIO;
+        this.cacheManager = CachingFileIO.createCacheManager(context);
+        this.fileIO = CachingFileIO.wrapWithCachingIfNeeded(fileIO, context, 
cacheManager);
         this.tableDefaultOptions = 
CatalogUtils.tableDefaultOptions(context.options().toMap());
         this.context = context;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 155d6a9893..94442d5c30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -35,6 +35,8 @@ import org.apache.paimon.consumer.ConsumerInfo;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.fs.cache.CachingFileIO;
+import org.apache.paimon.fs.cache.LocalCacheManager;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.options.Options;
@@ -100,6 +102,7 @@ public class RESTCatalog implements Catalog {
     private final CatalogContext context;
     private final boolean dataTokenEnabled;
     protected final Map<String, String> tableDefaultOptions;
+    private final @Nullable LocalCacheManager cacheManager;
 
     public RESTCatalog(CatalogContext context) {
         this(context, true);
@@ -115,6 +118,7 @@ public class RESTCatalog implements Catalog {
                         context.fallbackIO());
         this.dataTokenEnabled = 
api.options().get(RESTTokenFileIO.DATA_TOKEN_ENABLED);
         this.tableDefaultOptions = 
CatalogUtils.tableDefaultOptions(this.context.options().toMap());
+        this.cacheManager = CachingFileIO.createCacheManager(this.context);
     }
 
     @Override
@@ -1192,9 +1196,12 @@ public class RESTCatalog implements Catalog {
     }
 
     private FileIO fileIOForData(Path path, Identifier identifier) {
-        return dataTokenEnabled
-                ? new RESTTokenFileIO(context, api, identifier, path)
-                : fileIOFromOptions(path);
+        return CachingFileIO.wrapWithCachingIfNeeded(
+                dataTokenEnabled
+                        ? new RESTTokenFileIO(context, api, identifier, path)
+                        : fileIOFromOptions(path),
+                context,
+                cacheManager);
     }
 
     private FileIO fileIOFromOptions(Path path) {
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 478c3233a8..56fe394483 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -19,6 +19,7 @@ from typing import List, Optional, Union
 
 from pypaimon.api.api_response import GetTagResponse, PagedList
 from pypaimon.catalog.catalog import Catalog
+from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.catalog_environment import CatalogEnvironment
 from pypaimon.catalog.catalog_exception import (
     BranchAlreadyExistException,
@@ -36,6 +37,7 @@ from pypaimon.common.options.config import CatalogOptions
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
+from pypaimon.filesystem.caching_file_io import CachingFileIO
 from pypaimon.schema.schema_change import SchemaChange
 from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.snapshot.snapshot import Snapshot
@@ -50,7 +52,11 @@ class FileSystemCatalog(Catalog):
             raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path 
must be set")
         self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
         self.catalog_options = catalog_options
-        self.file_io = FileIO.get(self.warehouse, self.catalog_options)
+        self.catalog_context = 
CatalogContext.create_from_options(catalog_options)
+        self._cache_manager = 
CachingFileIO.create_cache_manager(self.catalog_options)
+        self.file_io = CachingFileIO.wrap_with_caching_if_needed(
+            FileIO.get(self.warehouse, self.catalog_options), 
self.catalog_options,
+            self._cache_manager)
 
     def list_databases(self) -> list:
         statuses = self.file_io.list_status(self.warehouse)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 45e3f18b44..a3d6a665ff 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -41,6 +41,7 @@ from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.options.config import CatalogOptions, FuseOptions
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.caching_file_io import CachingFileIO
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.schema_change import SchemaChange
@@ -66,6 +67,7 @@ class RESTCatalog(Catalog):
         self.context = CatalogContext.create(self.rest_api.options, 
context.hadoop_conf,
                                              context.prefer_io_loader, 
context.fallback_io_loader)
         self.data_token_enabled = 
self.rest_api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
+        self._cache_manager = 
CachingFileIO.create_cache_manager(self.context.options)
 
         # FUSE support (lazy import only when enabled)
         self.fuse_enabled = self.context.options.get(FuseOptions.FUSE_ENABLED, 
False)
@@ -614,7 +616,9 @@ class RESTCatalog(Catalog):
         )
 
     def file_io_from_options(self, table_path: str) -> FileIO:
-        return FileIO.get(table_path, self.context.options)
+        return CachingFileIO.wrap_with_caching_if_needed(
+            FileIO.get(table_path, self.context.options), self.context.options,
+            self._cache_manager)
 
     def file_io_for_data(self, table_path: str, identifier: Identifier):
         """
diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 52a5737002..7d9a227e4a 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -398,6 +398,50 @@ class CoreOptions:
         )
     )
 
+    LOCAL_CACHE_ENABLED: ConfigOption[bool] = (
+        ConfigOptions.key("local-cache.enabled")
+        .boolean_type()
+        .default_value(False)
+        .with_description(
+            "Whether to enable local block cache for file reads. "
+            "If local-cache.dir is configured, disk cache is used; otherwise 
memory cache is used."
+        )
+    )
+
+    LOCAL_CACHE_DIR: ConfigOption[str] = (
+        ConfigOptions.key("local-cache.dir")
+        .string_type()
+        .no_default_value()
+        .with_description(
+            "Directory for local block cache on disk. "
+            "If not configured, memory cache is used instead."
+        )
+    )
+
+    LOCAL_CACHE_MAX_SIZE: ConfigOption[MemorySize] = (
+        ConfigOptions.key("local-cache.max-size")
+        .memory_type()
+        .no_default_value()
+        .with_description("Maximum total size of the local block cache. 
Unlimited by default.")
+    )
+
+    LOCAL_CACHE_BLOCK_SIZE: ConfigOption[MemorySize] = (
+        ConfigOptions.key("local-cache.block-size")
+        .memory_type()
+        .default_value(MemorySize.of_mebi_bytes(1))
+        .with_description("Block size for local cache.")
+    )
+
+    LOCAL_CACHE_WHITELIST: ConfigOption[str] = (
+        ConfigOptions.key("local-cache.whitelist")
+        .string_type()
+        .default_value("meta,global-index")
+        .with_description(
+            "Comma-separated list of file types to cache. "
+            "Supported values: meta, global-index, bucket-index, data, 
file-index."
+        )
+    )
+
     READ_BATCH_SIZE: ConfigOption[int] = (
         ConfigOptions.key("read.batch-size")
         .int_type()
@@ -640,6 +684,21 @@ class CoreOptions:
     def global_index_thread_num(self) -> Optional[int]:
         return self.options.get(CoreOptions.GLOBAL_INDEX_THREAD_NUM)
 
+    def local_cache_enabled(self) -> bool:
+        return self.options.get(CoreOptions.LOCAL_CACHE_ENABLED)
+
+    def local_cache_dir(self) -> Optional[str]:
+        return self.options.get(CoreOptions.LOCAL_CACHE_DIR)
+
+    def local_cache_max_size(self) -> Optional[MemorySize]:
+        return self.options.get(CoreOptions.LOCAL_CACHE_MAX_SIZE)
+
+    def local_cache_block_size(self) -> MemorySize:
+        return self.options.get(CoreOptions.LOCAL_CACHE_BLOCK_SIZE)
+
+    def local_cache_whitelist(self) -> str:
+        return self.options.get(CoreOptions.LOCAL_CACHE_WHITELIST)
+
     def read_batch_size(self, default=None) -> int:
         return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)
 
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py 
b/paimon-python/pypaimon/filesystem/caching_file_io.py
new file mode 100644
index 0000000000..f5141a76e3
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -0,0 +1,389 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Block-level local cache for Paimon files.
+
+Provides a CachingFileIO wrapper that transparently caches remote file reads
+at block granularity. If a cache directory is configured, disk cache is used;
+otherwise an in-memory LRU cache is used. Files are classified by FileType and
+only cacheable types in the whitelist are cached; others are read directly from
+the delegate FileIO.
+"""
+
+import hashlib
+import os
+import threading
+from collections import OrderedDict
+from typing import Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.utils.file_type import FileType
+
+
+class LocalMemoryCacheManager:
+    """Block-level in-memory cache with LRU eviction."""
+
+    def __init__(self, max_size_bytes: int, block_size: int = 1 * 1024 * 1024):
+        self._max_size_bytes = max_size_bytes
+        self._block_size = block_size
+        self._lock = threading.Lock()
+        self._current_size = 0
+        self._cache: OrderedDict = OrderedDict()
+        self._file_size_cache: dict = {}
+
+    @property
+    def block_size(self) -> int:
+        return self._block_size
+
+    def get_block(self, file_path: str, block_index: int) -> Optional[bytes]:
+        key = (file_path, block_index)
+        with self._lock:
+            data = self._cache.get(key)
+            if data is not None:
+                self._cache.move_to_end(key)
+            return data
+
+    def put_block(self, file_path: str, block_index: int, data: bytes) -> None:
+        key = (file_path, block_index)
+        with self._lock:
+            if key in self._cache:
+                return
+            self._current_size += len(data)
+            self._cache[key] = data
+            while (self._max_size_bytes < (2 ** 63 - 1)
+                   and self._current_size > self._max_size_bytes
+                   and self._cache):
+                _, evicted = self._cache.popitem(last=False)
+                self._current_size -= len(evicted)
+
+    def get_file_size(self, file_path: str) -> int:
+        return self._file_size_cache.get(file_path, -1)
+
+    def put_file_size(self, file_path: str, size: int) -> None:
+        self._file_size_cache[file_path] = size
+
+
+class LocalDiskCacheManager:
+    """Block-level local disk cache with LRU eviction."""
+
+    def __init__(self, cache_dir: str, max_size_bytes: int,
+                 block_size: int = 1 * 1024 * 1024):
+        self._cache_dir = cache_dir
+        self._max_size_bytes = max_size_bytes
+        self._block_size = block_size
+        self._lock = threading.Lock()
+        self._current_size = 0
+        self._file_size_cache: dict = {}
+        # LRU-ordered index: cache_path -> size. OrderedDict with move_to_end 
for access order.
+        self._entry_index: OrderedDict = OrderedDict()
+        os.makedirs(cache_dir, exist_ok=True)
+        self._current_size = self._scan_and_populate_index()
+
+    @property
+    def block_size(self) -> int:
+        return self._block_size
+
+    def _cache_path(self, file_path: str, block_index: int) -> str:
+        key = f"{file_path}:{block_index}"
+        h = hashlib.sha256(key.encode('utf-8')).hexdigest()
+        prefix = h[:2]
+        sub_dir = os.path.join(self._cache_dir, prefix)
+        return os.path.join(sub_dir, h)
+
+    def get_block(self, file_path: str, block_index: int) -> Optional[bytes]:
+        path = self._cache_path(file_path, block_index)
+        with self._lock:
+            if path not in self._entry_index:
+                return None
+            self._entry_index.move_to_end(path)
+        try:
+            with open(path, 'rb') as f:
+                return f.read()
+        except (FileNotFoundError, OSError):
+            with self._lock:
+                size = self._entry_index.pop(path, None)
+                if size is not None:
+                    self._current_size -= size
+            return None
+
+    def put_block(self, file_path: str, block_index: int, data: bytes) -> None:
+        path = self._cache_path(file_path, block_index)
+
+        with self._lock:
+            if path in self._entry_index:
+                return
+
+        sub_dir = os.path.dirname(path)
+        os.makedirs(sub_dir, exist_ok=True)
+
+        tmp_path = path + f".tmp.{os.getpid()}.{threading.get_ident()}"
+        try:
+            with open(tmp_path, 'wb') as f:
+                f.write(data)
+            os.rename(tmp_path, path)
+        except Exception:
+            try:
+                os.unlink(tmp_path)
+            except OSError:
+                pass
+            return
+
+        need_evict = False
+        with self._lock:
+            self._entry_index[path] = len(data)
+            self._current_size += len(data)
+            need_evict = (self._max_size_bytes < (2 ** 63 - 1)
+                          and self._current_size > self._max_size_bytes)
+        if need_evict:
+            self._evict()
+
+    def _evict(self) -> None:
+        to_delete = []
+        with self._lock:
+            if self._current_size <= self._max_size_bytes:
+                return
+            while self._entry_index and self._current_size > 
self._max_size_bytes:
+                path, size = self._entry_index.popitem(last=False)
+                self._current_size -= size
+                to_delete.append((path, size))
+
+        for path, size in to_delete:
+            try:
+                os.unlink(path)
+            except OSError:
+                with self._lock:
+                    self._entry_index[path] = size
+                    self._current_size += size
+
+    def _scan_and_populate_index(self) -> int:
+        total = 0
+        for dirpath, _, filenames in os.walk(self._cache_dir):
+            for fn in filenames:
+                if '.tmp.' in fn:
+                    continue
+                fp = os.path.join(dirpath, fn)
+                try:
+                    size = os.path.getsize(fp)
+                    self._entry_index[fp] = size
+                    total += size
+                except OSError:
+                    pass
+        return total
+
+    def get_file_size(self, file_path: str) -> int:
+        return self._file_size_cache.get(file_path, -1)
+
+    def put_file_size(self, file_path: str, size: int) -> None:
+        self._file_size_cache[file_path] = size
+
+
+class CachingInputStream:
+    """Wraps a remote stream with block-level caching."""
+
+    def __init__(self, file_io, file_path: str, cache):
+        self._file_io = file_io
+        self._stream = None
+        self._file_path = file_path
+        self._file_size = -1
+        self._cache = cache
+        self._pos = 0
+
+    def _get_file_size(self) -> int:
+        if self._file_size == -1:
+            cached = self._cache.get_file_size(self._file_path)
+            if cached >= 0:
+                self._file_size = cached
+            else:
+                self._file_size = self._file_io.get_file_size(self._file_path)
+                self._cache.put_file_size(self._file_path, self._file_size)
+        return self._file_size
+
+    def seek(self, offset, whence=0):
+        if whence == 0:
+            self._pos = max(0, offset)
+        elif whence == 1:
+            self._pos = max(0, self._pos + offset)
+        elif whence == 2:
+            self._pos = max(0, self._get_file_size() + offset)
+        return self._pos
+
+    def tell(self) -> int:
+        return self._pos
+
+    def read(self, size=-1) -> bytes:
+        if size == -1 or size is None:
+            size = self._get_file_size() - self._pos
+        if size <= 0 or self._pos >= self._get_file_size():
+            return b''
+
+        end = min(self._pos + size, self._get_file_size())
+        block_size = self._cache.block_size
+
+        first_block = self._pos // block_size
+        last_block = (end - 1) // block_size
+
+        result = bytearray()
+        for bi in range(first_block, last_block + 1):
+            block_data = self._read_block(bi)
+
+            block_start = bi * block_size
+            start_in_block = max(self._pos - block_start, 0)
+            end_in_block = min(end - block_start, len(block_data))
+            result.extend(block_data[start_in_block:end_in_block])
+
+        self._pos = end
+        return bytes(result)
+
+    def _read_block(self, block_index: int) -> bytes:
+        cached = self._cache.get_block(self._file_path, block_index)
+        if cached is not None:
+            return cached
+
+        block_size = self._cache.block_size
+        offset = block_index * block_size
+        read_size = min(block_size, self._get_file_size() - offset)
+
+        stream = self._get_remote_stream()
+        stream.seek(offset)
+        data = self._read_fully(stream, read_size)
+
+        self._cache.put_block(self._file_path, block_index, data)
+        return data
+
+    def _read_fully(self, stream, size: int) -> bytes:
+        buf = bytearray()
+        remaining = size
+        while remaining > 0:
+            chunk = stream.read(remaining)
+            if not chunk:
+                break
+            buf.extend(chunk)
+            remaining -= len(chunk)
+        return bytes(buf)
+
+    def _get_remote_stream(self):
+        if self._stream is None:
+            self._stream = self._file_io.new_input_stream(self._file_path)
+        return self._stream
+
+    def close(self):
+        if self._stream is not None:
+            self._stream.close()
+            self._stream = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+        return False
+
+
+class CachingFileIO(FileIO):
+    """FileIO wrapper that caches reads at block granularity.
+
+    Only file types in the whitelist are cached. Others are read directly
+    from the delegate. After pickling/unpickling, the cache is None and reads
+    fall through to the delegate directly.
+    """
+
+    def __init__(self, delegate: FileIO, cache, whitelist=None):
+        self._delegate = delegate
+        self._cache = cache
+        if whitelist is None:
+            self._whitelist = {FileType.META, FileType.GLOBAL_INDEX}
+        else:
+            self._whitelist = whitelist
+
+    @staticmethod
+    def create_cache_manager(options):
+        """Creates a cache manager from options, or returns None if caching is 
not enabled."""
+        from pypaimon.common.options.core_options import CoreOptions
+        opts = CoreOptions(options)
+        if not opts.local_cache_enabled():
+            return None
+        cache_dir = opts.local_cache_dir()
+        max_size_opt = opts.local_cache_max_size()
+        max_size = max_size_opt.get_bytes() if max_size_opt is not None else 
(2 ** 63 - 1)
+        block_size = opts.local_cache_block_size().get_bytes()
+        if cache_dir is not None:
+            return LocalDiskCacheManager(cache_dir, max_size, block_size)
+        else:
+            return LocalMemoryCacheManager(max_size, block_size)
+
+    @staticmethod
+    def wrap_with_caching_if_needed(file_io, options, cache=None):
+        """Wraps the given FileIO with caching if local cache is enabled.
+
+        Args:
+            file_io: the FileIO to potentially wrap
+            options: an Options object containing cache configuration
+            cache: the cache manager instance (managed by the caller)
+
+        Returns:
+            a CachingFileIO if caching is enabled and configured, otherwise 
the original FileIO
+        """
+        if isinstance(file_io, CachingFileIO):
+            return file_io
+        if cache is None:
+            return file_io
+        from pypaimon.common.options.core_options import CoreOptions
+        opts = CoreOptions(options)
+        whitelist = FileType.parse_whitelist(opts.local_cache_whitelist())
+        if not whitelist:
+            return file_io
+        return CachingFileIO(file_io, cache, whitelist)
+
+    def new_input_stream(self, path: str):
+        file_type = FileType.classify(path)
+        if self._cache is None or file_type not in self._whitelist or 
FileType.is_mutable(path):
+            return self._delegate.new_input_stream(path)
+        return CachingInputStream(self._delegate, path, self._cache)
+
+    def new_output_stream(self, path: str):
+        return self._delegate.new_output_stream(path)
+
+    def get_file_status(self, path: str):
+        return self._delegate.get_file_status(path)
+
+    def list_status(self, path: str):
+        return self._delegate.list_status(path)
+
+    def exists(self, path: str) -> bool:
+        return self._delegate.exists(path)
+
+    def delete(self, path: str, recursive: bool = False) -> bool:
+        return self._delegate.delete(path, recursive)
+
+    def mkdirs(self, path: str) -> bool:
+        return self._delegate.mkdirs(path)
+
+    def rename(self, src: str, dst: str) -> bool:
+        return self._delegate.rename(src, dst)
+
+    def get_file_size(self, path: str) -> int:
+        return self._delegate.get_file_size(path)
+
+    def is_dir(self, path: str) -> bool:
+        return self._delegate.is_dir(path)
+
+    def __getattr__(self, name):
+        return getattr(self._delegate, name)
+
+    def close(self):
+        self._delegate.close()
diff --git a/paimon-python/pypaimon/tests/caching_file_io_test.py 
b/paimon-python/pypaimon/tests/caching_file_io_test.py
new file mode 100644
index 0000000000..048f70589d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/caching_file_io_test.py
@@ -0,0 +1,421 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Tests for LocalDiskCacheManager, CachingInputStream, and CachingFileIO."""
+
+import io
+import os
+import shutil
+import tempfile
+import threading
+import unittest
+from unittest.mock import MagicMock
+
+from pypaimon.filesystem.caching_file_io import (
+    LocalDiskCacheManager,
+    CachingFileIO,
+    CachingInputStream,
+)
+
+
+class LocalDiskCacheManagerTest(unittest.TestCase):
+
+    def setUp(self):
+        self.cache_dir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+    def test_put_and_get(self):
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        data = b"hello world block"
+        cache.put_block("file1.index", 0, data)
+        result = cache.get_block("file1.index", 0)
+        self.assertEqual(data, result)
+
+    def test_cache_miss(self):
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        result = cache.get_block("nonexistent", 0)
+        self.assertIsNone(result)
+
+    def test_different_keys(self):
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        cache.put_block("file1", 0, b"block0")
+        cache.put_block("file1", 1, b"block1")
+        cache.put_block("file2", 0, b"other0")
+
+        self.assertEqual(b"block0", cache.get_block("file1", 0))
+        self.assertEqual(b"block1", cache.get_block("file1", 1))
+        self.assertEqual(b"other0", cache.get_block("file2", 0))
+
+    def test_duplicate_put_is_noop(self):
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        cache.put_block("file1", 0, b"original")
+        cache.put_block("file1", 0, b"duplicate")
+        self.assertEqual(b"original", cache.get_block("file1", 0))
+
+    def test_eviction(self):
+        cache = LocalDiskCacheManager(self.cache_dir, max_size_bytes=100, 
block_size=64)
+        cache.put_block("f", 0, b"a" * 60)
+        cache.put_block("f", 1, b"b" * 60)
+        # Total 120 > 100, at least one block should be evicted
+        remaining_0 = cache.get_block("f", 0)
+        remaining_1 = cache.get_block("f", 1)
+        self.assertTrue(remaining_0 is None or remaining_1 is None)
+
+    def test_scan_size_on_restart(self):
+        cache1 = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        cache1.put_block("f", 0, b"x" * 100)
+        cache1.put_block("f", 1, b"y" * 200)
+
+        # Simulate restart: new cache instance on same directory
+        cache2 = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        self.assertEqual(300, cache2._current_size)
+        self.assertEqual(b"x" * 100, cache2.get_block("f", 0))
+        self.assertEqual(b"y" * 200, cache2.get_block("f", 1))
+
+    def test_cache_dir_created(self):
+        new_dir = os.path.join(self.cache_dir, "sub", "deep")
+        LocalDiskCacheManager(new_dir, 2 ** 63 - 1, block_size=64)
+        self.assertTrue(os.path.isdir(new_dir))
+
+    def test_concurrent_put_get(self):
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        errors = []
+
+        def writer(idx):
+            try:
+                data = bytes([idx % 256]) * 100
+                cache.put_block("concurrent", idx, data)
+            except Exception as e:
+                errors.append(e)
+
+        def reader(idx):
+            try:
+                result = cache.get_block("concurrent", idx)
+                if result is not None:
+                    expected = bytes([idx % 256]) * 100
+                    assert result == expected, f"Mismatch at {idx}"
+            except Exception as e:
+                errors.append(e)
+
+        threads = []
+        for i in range(20):
+            threads.append(threading.Thread(target=writer, args=(i,)))
+            threads.append(threading.Thread(target=reader, args=(i,)))
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+
+        self.assertEqual([], errors)
+
+    def test_unlimited_cache_skips_eviction(self):
+        cache = LocalDiskCacheManager(self.cache_dir, max_size_bytes=2 ** 63 - 
1, block_size=64)
+        for i in range(50):
+            cache.put_block("f", i, b"x" * 100)
+        # All 50 blocks should still be present
+        for i in range(50):
+            self.assertIsNotNone(cache.get_block("f", i))
+
+
+class CachingInputStreamTest(unittest.TestCase):
+
+    def setUp(self):
+        self.cache_dir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+    def _make_file_io(self, data, path="test"):
+        file_io = MagicMock()
+        file_io.new_input_stream.side_effect = lambda p: io.BytesIO(data)
+        file_io.get_file_size.side_effect = lambda p: len(data)
+        return file_io
+
+    def test_read_entire_file(self):
+        data = b"abcdefghijklmnop"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        result = stream.read()
+        self.assertEqual(data, result)
+
+    def test_read_with_size(self):
+        data = b"abcdefghijklmnop"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        self.assertEqual(b"abcde", stream.read(5))
+        self.assertEqual(b"fghij", stream.read(5))
+        self.assertEqual(b"klmnop", stream.read(10))
+
+    def test_seek_and_read(self):
+        data = b"0123456789abcdef"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        stream.seek(10)
+        self.assertEqual(b"abcdef", stream.read())
+
+    def test_seek_whence_1(self):
+        data = b"0123456789"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        stream.read(3)
+        stream.seek(2, 1)  # relative
+        self.assertEqual(5, stream.tell())
+        self.assertEqual(b"56789", stream.read())
+
+    def test_seek_whence_2(self):
+        data = b"0123456789"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        stream.seek(-3, 2)  # from end
+        self.assertEqual(b"789", stream.read())
+
+    def test_read_spanning_multiple_blocks(self):
+        data = bytes(range(256)) * 4  # 1024 bytes
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=100)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        # Read across block boundary: block 0 ends at 100, block 1 starts at 
100
+        stream.seek(90)
+        result = stream.read(30)  # 90..120, spans blocks 0 and 1
+        self.assertEqual(data[90:120], result)
+
+    def test_cache_hit_avoids_remote_read(self):
+        data = b"0123456789abcdef"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+
+        # First read populates cache
+        file_io1 = self._make_file_io(data)
+        stream1 = CachingInputStream(file_io1, "test", cache)
+        stream1.read()
+
+        # Second stream: remote should not be called because all blocks are 
cached
+        file_io2 = MagicMock()
+        file_io2.get_file_size.side_effect = lambda p: len(data)
+        stream2 = CachingInputStream(file_io2, "test", cache)
+        result = stream2.read()
+        self.assertEqual(data, result)
+        file_io2.new_input_stream.assert_not_called()
+
+    def test_read_empty(self):
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(b""), "test", cache)
+        self.assertEqual(b"", stream.read())
+
+    def test_read_at_eof(self):
+        data = b"abc"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        stream.read()
+        self.assertEqual(b"", stream.read(10))
+
+    def test_context_manager(self):
+        data = b"test"
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        file_io = self._make_file_io(data)
+        with CachingInputStream(file_io, "test", cache):
+            pass
+
+    def test_partial_last_block(self):
+        data = b"abcdefghij"  # 10 bytes, block_size=8 -> block0=8, block1=2
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=8)
+        stream = CachingInputStream(self._make_file_io(data), "test", cache)
+        result = stream.read()
+        self.assertEqual(data, result)
+        # Verify block 1 is only 2 bytes
+        self.assertEqual(b"ij", cache.get_block("test", 1))
+
+
+class CachingFileIOTest(unittest.TestCase):
+
+    def setUp(self):
+        self.cache_dir = tempfile.mkdtemp()
+
+    def tearDown(self):
+        shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+    def _make_delegate(self, path_to_data):
+        delegate = MagicMock()
+
+        def new_input_stream(path):
+            return io.BytesIO(path_to_data.get(path, b""))
+
+        def get_file_size(path):
+            return len(path_to_data.get(path, b""))
+
+        delegate.new_input_stream.side_effect = new_input_stream
+        delegate.get_file_size.side_effect = get_file_size
+        return delegate
+
+    def test_meta_file_is_cached(self):
+        data = b"snapshot data"
+        delegate = self._make_delegate({"snapshot-1": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        # First read
+        with caching_io.new_input_stream("snapshot-1") as s:
+            result1 = s.read()
+        self.assertEqual(data, result1)
+
+        # Second read should hit cache (returns CachingInputStream)
+        with caching_io.new_input_stream("snapshot-1") as s:
+            result2 = s.read()
+        self.assertEqual(data, result2)
+        # get_file_size called only once — second stream gets size from cache
+        self.assertEqual(1, delegate.get_file_size.call_count)
+
+    def test_manifest_file_is_cached(self):
+        data = b"manifest data"
+        delegate = self._make_delegate({"manifest-abc": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        with caching_io.new_input_stream("manifest-abc") as s:
+            self.assertEqual(data, s.read())
+
+    def test_global_index_file_is_cached(self):
+        data = b"index data"
+        delegate = self._make_delegate({"global-index-uuid.index": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        with caching_io.new_input_stream("global-index-uuid.index") as s:
+            self.assertEqual(data, s.read())
+
+    def test_bucket_index_file_not_cached_by_default(self):
+        data = b"bucket index"
+        delegate = self._make_delegate({"index-uuid-0": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        result = caching_io.new_input_stream("index-uuid-0")
+        self.assertNotIsInstance(result, CachingInputStream)
+        self.assertEqual(data, result.read())
+        delegate.get_file_size.assert_not_called()
+
+    def test_bucket_index_cached_when_in_whitelist(self):
+        from pypaimon.utils.file_type import FileType
+        data = b"bucket index"
+        delegate = self._make_delegate({"index-uuid-0": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        whitelist = {FileType.META, FileType.GLOBAL_INDEX, 
FileType.BUCKET_INDEX}
+        caching_io = CachingFileIO(delegate, cache, whitelist)
+
+        with caching_io.new_input_stream("index-uuid-0") as s:
+            self.assertEqual(data, s.read())
+
+    def test_custom_whitelist_meta_only(self):
+        from pypaimon.utils.file_type import FileType
+        delegate = self._make_delegate({
+            "snapshot-1": b"snap",
+            "global-index-uuid.index": b"idx",
+        })
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache, {FileType.META})
+
+        with caching_io.new_input_stream("snapshot-1") as s:
+            self.assertIsInstance(s, CachingInputStream)
+
+        result = caching_io.new_input_stream("global-index-uuid.index")
+        self.assertNotIsInstance(result, CachingInputStream)
+
+    def test_data_file_not_cached(self):
+        data = b"data content"
+        delegate = self._make_delegate({"data-abc.orc": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        result = caching_io.new_input_stream("data-abc.orc")
+        # Should be raw BytesIO, not CachingInputStream
+        self.assertNotIsInstance(result, CachingInputStream)
+        self.assertEqual(data, result.read())
+        # get_file_size should NOT be called for data files
+        delegate.get_file_size.assert_not_called()
+
+    def test_file_index_not_cached(self):
+        data = b"file index content"
+        delegate = self._make_delegate({"data-abc.orc.index": data})
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        result = caching_io.new_input_stream("data-abc.orc.index")
+        self.assertNotIsInstance(result, CachingInputStream)
+        self.assertEqual(data, result.read())
+        delegate.get_file_size.assert_not_called()
+
+    def test_delegate_methods_forwarded(self):
+        delegate = MagicMock()
+        cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1, 
block_size=64)
+        caching_io = CachingFileIO(delegate, cache)
+
+        caching_io.exists("/some/path")
+        delegate.exists.assert_called_once_with("/some/path")
+
+        caching_io.mkdirs("/some/dir")
+        delegate.mkdirs.assert_called_once_with("/some/dir")
+
+        caching_io.delete("/some/file", recursive=True)
+        delegate.delete.assert_called_once_with("/some/file", True)
+
+        caching_io.rename("/src", "/dst")
+        delegate.rename.assert_called_once_with("/src", "/dst")
+
+        caching_io.get_file_status("/path")
+        delegate.get_file_status.assert_called_once_with("/path")
+
+        caching_io.list_status("/dir")
+        delegate.list_status.assert_called_once_with("/dir")
+
+        caching_io.new_output_stream("/out")
+        delegate.new_output_stream.assert_called_once_with("/out")
+
+
+class ConfigOptionsTest(unittest.TestCase):
+
+    def test_local_cache_options_defaults(self):
+        from pypaimon.common.options import Options
+        from pypaimon.common.options.core_options import CoreOptions
+
+        opts = CoreOptions(Options({}))
+        self.assertFalse(opts.local_cache_enabled())
+        self.assertIsNone(opts.local_cache_dir())
+        self.assertIsNone(opts.local_cache_max_size())
+        self.assertEqual(1 * 1024 * 1024, 
opts.local_cache_block_size().get_bytes())
+        self.assertEqual("meta,global-index", opts.local_cache_whitelist())
+
+    def test_local_cache_options_custom(self):
+        from pypaimon.common.options import Options
+        from pypaimon.common.options.core_options import CoreOptions
+
+        opts = CoreOptions(Options({
+            "local-cache.enabled": "true",
+            "local-cache.dir": "/custom/cache",
+            "local-cache.max-size": "2gb",
+            "local-cache.block-size": "4mb",
+            "local-cache.whitelist": "meta,global-index,bucket-index",
+        }))
+        self.assertTrue(opts.local_cache_enabled())
+        self.assertEqual("/custom/cache", opts.local_cache_dir())
+        self.assertEqual(2 * 1024 * 1024 * 1024, 
opts.local_cache_max_size().get_bytes())
+        self.assertEqual(4 * 1024 * 1024, 
opts.local_cache_block_size().get_bytes())
+        self.assertEqual("meta,global-index,bucket-index", 
opts.local_cache_whitelist())
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/file_type_test.py 
b/paimon-python/pypaimon/tests/file_type_test.py
new file mode 100644
index 0000000000..b82604eaed
--- /dev/null
+++ b/paimon-python/pypaimon/tests/file_type_test.py
@@ -0,0 +1,109 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Tests for FileType classification."""
+
+import unittest
+
+from pypaimon.utils.file_type import FileType
+
+
+class FileTypeClassifyTest(unittest.TestCase):
+
+    def test_snapshot(self):
+        self.assertEqual(FileType.META, 
FileType.classify("/warehouse/db/t/snapshot/snapshot-1"))
+        self.assertEqual(FileType.META, FileType.classify("snapshot-42"))
+
+    def test_schema(self):
+        self.assertEqual(FileType.META, 
FileType.classify("/warehouse/db/t/schema/schema-0"))
+
+    def test_statistics(self):
+        self.assertEqual(FileType.META, FileType.classify("stat-abc123"))
+
+    def test_tag(self):
+        self.assertEqual(FileType.META, FileType.classify("tag-v1.0"))
+
+    def test_consumer(self):
+        self.assertEqual(FileType.META, FileType.classify("consumer-group1"))
+
+    def test_service(self):
+        self.assertEqual(FileType.META, FileType.classify("service-abc"))
+
+    def test_manifest(self):
+        self.assertEqual(FileType.META, FileType.classify("manifest-abc123"))
+        self.assertEqual(FileType.META, FileType.classify("manifest-list-abc"))
+        self.assertEqual(FileType.META, 
FileType.classify("index-manifest-abc"))
+
+    def test_hint_files(self):
+        self.assertEqual(FileType.META, FileType.classify("EARLIEST"))
+        self.assertEqual(FileType.META, FileType.classify("LATEST"))
+
+    def test_success_files(self):
+        self.assertEqual(FileType.META, FileType.classify("_SUCCESS"))
+        self.assertEqual(FileType.META, FileType.classify("part-0_SUCCESS"))
+
+    def test_changelog_meta(self):
+        self.assertEqual(FileType.META, 
FileType.classify("/warehouse/db/t/changelog/changelog-1"))
+
+    def test_changelog_not_in_changelog_dir(self):
+        self.assertEqual(FileType.DATA, 
FileType.classify("/warehouse/db/t/other/changelog-1"))
+
+    def test_global_index(self):
+        self.assertEqual(FileType.GLOBAL_INDEX,
+                         FileType.classify("global-index-abc-123.index"))
+        self.assertEqual(FileType.GLOBAL_INDEX,
+                         
FileType.classify("/warehouse/db/t/global-index/global-index-uuid.index"))
+
+    def test_file_index(self):
+        self.assertEqual(FileType.FILE_INDEX,
+                         FileType.classify("data-abc.orc.index"))
+        self.assertEqual(FileType.FILE_INDEX,
+                         FileType.classify("some-file.index"))
+
+    def test_bucket_index(self):
+        self.assertEqual(FileType.BUCKET_INDEX, 
FileType.classify("index-abc-0"))
+        self.assertEqual(FileType.BUCKET_INDEX, 
FileType.classify("index-uuid-1"))
+
+    def test_data_files(self):
+        self.assertEqual(FileType.DATA, FileType.classify("data-abc.orc"))
+        self.assertEqual(FileType.DATA, FileType.classify("data-abc.parquet"))
+        self.assertEqual(FileType.DATA, FileType.classify("unknown-file"))
+
+    def test_temp_file_unwrap(self):
+        # .{originalName}.{UUID}.tmp -> originalName
+        uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
+        temp_name = f".snapshot-1.{uuid}.tmp"
+        self.assertEqual(FileType.META, FileType.classify(temp_name))
+
+        temp_data = f".data-abc.orc.{uuid}.tmp"
+        self.assertEqual(FileType.DATA, FileType.classify(temp_data))
+
+    def test_temp_file_not_matching_pattern(self):
+        self.assertEqual(FileType.DATA, FileType.classify(".short.tmp"))
+        self.assertEqual(FileType.DATA, 
FileType.classify("no-leading-dot.uuid-here.tmp"))
+
+    def test_is_index(self):
+        self.assertTrue(FileType.BUCKET_INDEX.is_index())
+        self.assertTrue(FileType.GLOBAL_INDEX.is_index())
+        self.assertTrue(FileType.FILE_INDEX.is_index())
+        self.assertFalse(FileType.META.is_index())
+        self.assertFalse(FileType.DATA.is_index())
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/utils/file_type.py 
b/paimon-python/pypaimon/utils/file_type.py
new file mode 100644
index 0000000000..384f927da7
--- /dev/null
+++ b/paimon-python/pypaimon/utils/file_type.py
@@ -0,0 +1,120 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Classification of Paimon files."""
+
+import logging
+import os
+from enum import Enum
+
+logger = logging.getLogger(__name__)
+
+
+class FileType(Enum):
+    """Classification of Paimon files.
+
+    - META: snapshot, schema, manifest, statistics, tag, changelog metadata,
+            hint files, _SUCCESS, consumer, service files
+    - DATA: data files and any unrecognized files (default)
+    - BUCKET_INDEX: bucket level index files (Hash, DV)
+    - GLOBAL_INDEX: table level global index files (btree, bitmap, lumina, 
tantivy)
+    - FILE_INDEX: data-file index files (bloom filter, bitmap, etc.)
+    """
+    META = "META"
+    DATA = "DATA"
+    BUCKET_INDEX = "BUCKET_INDEX"
+    GLOBAL_INDEX = "GLOBAL_INDEX"
+    FILE_INDEX = "FILE_INDEX"
+
+    def is_index(self) -> bool:
+        return self in (FileType.BUCKET_INDEX, FileType.GLOBAL_INDEX, 
FileType.FILE_INDEX)
+
+    @staticmethod
+    def is_mutable(file_path: str) -> bool:
+        name = os.path.basename(file_path)
+        return name in ("EARLIEST", "LATEST")
+
+    @staticmethod
+    def classify(file_path: str) -> 'FileType':
+        name = os.path.basename(file_path)
+        name = FileType._unwrap_temp_file_name(name)
+
+        if (name.startswith("snapshot-")
+                or name.startswith("schema-")
+                or name.startswith("stat-")
+                or name.startswith("tag-")
+                or name.startswith("consumer-")
+                or name.startswith("service-")):
+            return FileType.META
+
+        if name.endswith(".index"):
+            if "global-index-" in name:
+                return FileType.GLOBAL_INDEX
+            return FileType.FILE_INDEX
+
+        if "manifest" in name:
+            return FileType.META
+
+        if name.startswith("index-"):
+            return FileType.BUCKET_INDEX
+
+        if name in ("EARLIEST", "LATEST"):
+            return FileType.META
+
+        if name == "_SUCCESS" or name.endswith("_SUCCESS"):
+            return FileType.META
+
+        if name.startswith("changelog-"):
+            parent = os.path.basename(os.path.dirname(file_path))
+            if parent == "changelog":
+                return FileType.META
+
+        return FileType.DATA
+
+    @staticmethod
+    def parse_whitelist(whitelist_str: str) -> set:
+        mapping = {
+            "meta": FileType.META,
+            "global-index": FileType.GLOBAL_INDEX,
+            "bucket-index": FileType.BUCKET_INDEX,
+            "data": FileType.DATA,
+            "file-index": FileType.FILE_INDEX,
+        }
+        result = set()
+        for name in whitelist_str.split(","):
+            name = name.strip()
+            if name in mapping:
+                result.add(mapping[name])
+            elif name:
+                logger.warning(
+                    "Unknown local-cache.whitelist value '%s'. "
+                    "Supported values: meta, global-index, bucket-index, data, 
file-index.",
+                    name,
+                )
+        return result
+
+    @staticmethod
+    def _unwrap_temp_file_name(name: str) -> str:
+        # format: .{originalName}.{UUID}.tmp
+        # suffix ".{UUID}.tmp" is 41 chars: 1(dot) + 36(UUID) + 4(.tmp)
+        if len(name) < 43 or name[0] != '.' or not name.endswith('.tmp'):
+            return name
+        dot_before_uuid = len(name) - 41
+        if name[dot_before_uuid] != '.':
+            return name
+        return name[1:dot_before_uuid]

Reply via email to