This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ce38b2ccd7 [java][python] Add block-level local disk cache for file
reads (#7699)
ce38b2ccd7 is described below
commit ce38b2ccd7cbaabcfc1d5e90f68586ebecc66265
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 14 21:30:19 2026 +0800
[java][python] Add block-level local disk cache for file reads (#7699)
Introduce a CachingFileIO wrapper that transparently caches remote file
reads at block granularity on local disk. Files are classified by
FileType and only META, GLOBAL_INDEX, BUCKET_INDEX types are cached;
DATA and FILE_INDEX are read directly.
---
docs/content/program-api/file-cache.md | 130 ++++++
docs/content/pypaimon/global-index.md | 136 ++++++
.../generated/catalog_configuration.html | 30 ++
.../org/apache/paimon/options/CatalogOptions.java | 38 ++
.../org/apache/paimon/fs/cache/CachingFileIO.java | 173 ++++++++
.../fs/cache/CachingSeekableInputStream.java | 160 +++++++
.../apache/paimon/fs/cache/LocalCacheManager.java | 37 ++
.../paimon/fs/cache/LocalDiskCacheManager.java | 237 +++++++++++
.../paimon/fs/cache/LocalMemoryCacheManager.java | 118 ++++++
.../java/org/apache/paimon/utils/FileType.java | 48 +++
.../apache/paimon/fs/cache/CachingFileIOTest.java | 469 +++++++++++++++++++++
.../paimon/fs/cache/LocalDiskCacheManagerTest.java | 171 ++++++++
.../org/apache/paimon/catalog/AbstractCatalog.java | 7 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 13 +-
.../pypaimon/catalog/filesystem_catalog.py | 8 +-
.../pypaimon/catalog/rest/rest_catalog.py | 6 +-
.../pypaimon/common/options/core_options.py | 59 +++
.../pypaimon/filesystem/caching_file_io.py | 389 +++++++++++++++++
.../pypaimon/tests/caching_file_io_test.py | 421 ++++++++++++++++++
paimon-python/pypaimon/tests/file_type_test.py | 109 +++++
paimon-python/pypaimon/utils/file_type.py | 120 ++++++
21 files changed, 2873 insertions(+), 6 deletions(-)
diff --git a/docs/content/program-api/file-cache.md
b/docs/content/program-api/file-cache.md
new file mode 100644
index 0000000000..cfad2c00c7
--- /dev/null
+++ b/docs/content/program-api/file-cache.md
@@ -0,0 +1,130 @@
+---
+title: "Local Cache"
+weight: 8
+type: docs
+aliases:
+- /program-api/file-cache.html
+- /pypaimon/file-cache.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Local Cache
+
+When reading files from remote storage (S3, OSS, HDFS, etc.), each seek+read
goes over the network. Paimon provides a block-level local cache that
transparently caches file reads, significantly reducing remote I/O for repeated
access patterns.
+
+The cache supports two modes:
+- **Disk cache**: when `local-cache.dir` is configured, blocks are cached on
local disk.
+- **Memory cache**: when `local-cache.dir` is not configured, blocks are
cached in memory.
+
+## Cached File Types
+
+The cache classifies files by type. By default, only `meta` and `global-index`
types are cached. You can customize this via the `local-cache.whitelist` option.
+
+| File Type | Config Name | Examples | Default Cached |
+|-----------|-------------|----------|----------------|
+| META | meta | snapshot, schema, manifest, statistics, tag | Yes |
+| GLOBAL_INDEX | global-index | BTree, Lumina, Tantivy index files | Yes |
+| BUCKET_INDEX | bucket-index | Hash, deletion vector index files | No |
+| DATA | data | Data files (ORC, Parquet, etc.) | No |
+| FILE_INDEX | file-index | Data-file level bloom filter, bitmap | No |
+
+All file types can be added to the whitelist. The default whitelist is
`meta,global-index`.
+
+## Enable Cache
+
+This is a catalog-level option. Configure it when creating the catalog:
+
+{{< tabs "enable-cache" >}}
+
+{{< tab "Java" >}}
+
+```java
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+
+Options options = new Options();
+options.set("warehouse", "s3://my-bucket/warehouse");
+options.set("local-cache.enabled", "true");
+// optional: use disk cache by specifying a directory
+options.set("local-cache.dir", "/tmp/paimon-cache");
+// optional: customize limits
+options.set("local-cache.max-size", "2gb");
+options.set("local-cache.block-size", "1mb");
+
+CatalogContext context = CatalogContext.create(options);
+Catalog catalog = CatalogFactory.createCatalog(context);
+
+// All tables from this catalog will use the cache
+Table table = catalog.getTable(Identifier.create("my_db", "my_table"));
+```
+
+{{< /tab >}}
+
+{{< tab "Python" >}}
+
+```python
+import pypaimon
+
+options = {
+ "warehouse": "s3://my-bucket/warehouse",
+ "local-cache.enabled": "true",
+ # optional: use disk cache by specifying a directory
+ "local-cache.dir": "/tmp/paimon-cache",
+ # optional: customize limits
+ "local-cache.max-size": "2gb",
+ "local-cache.block-size": "1mb",
+}
+
+catalog = pypaimon.create_catalog(options)
+
+# All tables from this catalog will use the cache
+table = catalog.get_table("db.my_table")
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Cache Options
+
+| Option | Type | Default | Description |
+|--------|------|---------|-------------|
+| `local-cache.enabled` | Boolean | false | Whether to enable local block
cache for file reads. |
+| `local-cache.dir` | String | (none) | Directory for storing cached blocks on
disk. If not configured, memory cache is used. |
+| `local-cache.max-size` | MemorySize | unlimited | Maximum total size of the
cache. When exceeded, the least recently used blocks are evicted. |
+| `local-cache.block-size` | MemorySize | 1 mb | Block size for caching. Files
are logically divided into fixed-size blocks and cached independently. |
+| `local-cache.whitelist` | String | meta,global-index | Comma-separated list
of file types to cache. Supported values: `meta`, `global-index`,
`bucket-index`, `data`, `file-index`. |
+
+## How It Works
+
+- Files are logically divided into fixed-size blocks (default 1 MB).
+- On the first read, blocks are downloaded from remote storage and cached
locally (on disk or in memory).
+- Subsequent reads of the same block are served from the local cache, skipping
remote I/O.
+- When using disk cache, cache files are keyed by remote file path and block
offset, so they persist across process restarts and can be reused.
+- When the cache exceeds `max-size`, the least recently used blocks are
evicted automatically.
+
+## Cache Lifecycle
+
+The cache is created and managed by the Catalog. All tables obtained from the
same catalog share a single cache instance. The cache lives as long as the
Catalog object is reachable — no explicit close is needed.
+
+In distributed computing frameworks (Flink, Spark), the `FileIO` is serialized
and shipped to task managers. After deserialization, the cache is **not**
recreated — reads fall through directly to the remote storage. This is by
design: the cache lifecycle is bound to the Catalog that created it, and a
deserialized `FileIO` is no longer managed by any Catalog.
+
+If you need caching on task managers, create a new Catalog with cache options
enabled on each worker node.
diff --git a/docs/content/pypaimon/global-index.md
b/docs/content/pypaimon/global-index.md
new file mode 100644
index 0000000000..f5e4d602e3
--- /dev/null
+++ b/docs/content/pypaimon/global-index.md
@@ -0,0 +1,136 @@
+---
+title: "Global Index"
+weight: 6
+type: docs
+aliases:
+- /pypaimon/global-index.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Global Index
+
+PyPaimon supports querying global indexes built on Data Evolution (append)
tables. Three index types are available:
+
+- **BTree Index**: B-tree based index for scalar column lookups. Supports
equality, IN, range, and combined predicates.
+- **Vector Index (Lumina)**: Approximate nearest neighbor (ANN) index for
vector similarity search.
+- **Full-Text Index (Tantivy)**: Full-text search index for text retrieval
with relevance scoring.
+
+> Global indexes must be built beforehand (e.g., via Spark or Flink). See
[Global Index]({{< ref "append-table/global-index" >}}) for how to create
indexes.
+
+## BTree Index
+
+BTree index is automatically used during scan when a filter predicate matches
the indexed column. No special API is needed — just set a filter on the read
builder.
+
+```python
+import pypaimon
+
+catalog = pypaimon.create_catalog(...)
+table = catalog.get_table("db.my_table")
+
+# BTree index is used automatically when filtering on indexed columns
+read_builder = table.new_read_builder()
+read_builder = read_builder.with_filter(
+ pypaimon.PredicateBuilder(table.fields)
+ .in_("name", ["a200", "a300"])
+)
+
+scan = read_builder.new_scan()
+read = read_builder.new_read()
+splits = scan.plan().splits
+data = read.to_arrow(splits)
+```
+
+Supported predicates: `equal`, `not_equal`, `less_than`, `less_or_equal`,
`greater_than`, `greater_or_equal`, `in_`, `not_in`, `between`, `is_null`,
`is_not_null`.
+
+## Vector Index (Lumina)
+
+Use `VectorSearchBuilder` to perform approximate nearest neighbor search on a
vector column, then read the matched rows.
+
+```python
+table = catalog.get_table("db.my_table")
+
+# Step 1: vector search to get matching row IDs
+builder = table.new_vector_search_builder()
+index_result = (
+ builder
+ .with_vector_column("embedding")
+ .with_query_vector([1.0, 2.0, 3.0, ...])
+ .with_limit(10)
+ .execute_local()
+)
+
+# Step 2: read actual data for matched rows
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan()
+scan.with_global_index_result(index_result)
+read = read_builder.new_read()
+data = read.to_arrow(scan.plan().splits)
+```
+
+You can also add a scalar filter to pre-filter rows before vector search:
+
+```python
+predicate = (
+ pypaimon.PredicateBuilder(table.fields)
+ .equal("category", "electronics")
+)
+
+index_result = (
+ table.new_vector_search_builder()
+ .with_vector_column("embedding")
+ .with_query_vector([1.0, 2.0, 3.0, ...])
+ .with_limit(10)
+ .with_filter(predicate)
+ .execute_local()
+)
+
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan()
+scan.with_global_index_result(index_result)
+read = read_builder.new_read()
+data = read.to_arrow(scan.plan().splits)
+```
+
+## Full-Text Index (Tantivy)
+
+Use `FullTextSearchBuilder` to perform full-text search on a text column, then
read the matched rows.
+
+```python
+table = catalog.get_table("db.my_table")
+
+# Step 1: full-text search to get matching row IDs
+builder = table.new_full_text_search_builder()
+index_result = (
+ builder
+ .with_text_column("content")
+ .with_query_text("search keywords")
+ .with_limit(20)
+ .execute_local()
+)
+
+# Step 2: read actual data for matched rows
+read_builder = table.new_read_builder()
+scan = read_builder.new_scan()
+scan.with_global_index_result(index_result)
+read = read_builder.new_read()
+data = read.to_arrow(scan.plan().splits)
+```
+
+For better performance when reading from remote storage, consider enabling the
[Local Cache]({{< ref "program-api/file-cache" >}}).
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index ae0f476c1e..6cafa6ccdf 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -146,6 +146,36 @@ under the License.
<td>Boolean</td>
<td>Sync all table properties to the catalog metastore (e.g. Hive
metastore, JDBC catalog store)</td>
</tr>
+ <tr>
+ <td><h5>local-cache.block-size</h5></td>
+ <td style="word-wrap: break-word;">1 mb</td>
+ <td>MemorySize</td>
+ <td>Block size for local cache.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.dir</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Directory for local block cache on disk. If not configured,
memory cache is used instead.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to enable local block cache for file reads. If
local-cache.dir is configured, disk cache is used; otherwise memory cache is
used.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.max-size</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Maximum total size of the local block cache. Unlimited by
default.</td>
+ </tr>
+ <tr>
+ <td><h5>local-cache.whitelist</h5></td>
+ <td style="word-wrap: break-word;">"meta,global-index"</td>
+ <td>String</td>
+ <td>Comma-separated list of file types to cache. Supported values:
meta, global-index, bucket-index, data, file-index.</td>
+ </tr>
<tr>
<td><h5>table.type</h5></td>
<td style="word-wrap: break-word;">managed</td>
diff --git
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index b6f31dd550..f900603897 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -23,6 +23,7 @@ import org.apache.paimon.table.CatalogTableType;
import java.time.Duration;
import static org.apache.paimon.options.ConfigOptions.key;
+import static org.apache.paimon.options.MemorySize.ofMebiBytes;
/** Options for catalog. */
public class CatalogOptions {
@@ -185,4 +186,41 @@ public class CatalogOptions {
"Whether to allow static cache in file io
implementation. If not allowed, this means that "
+ "there may be a large number of FileIO
instances generated, enabling caching can "
+ "lead to resource leakage.");
+
+ public static final ConfigOption<Boolean> LOCAL_CACHE_ENABLED =
+ key("local-cache.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to enable local block cache for file
reads. "
+ + "If local-cache.dir is configured, disk
cache is used; otherwise memory cache is used.");
+
+ public static final ConfigOption<String> LOCAL_CACHE_DIR =
+ key("local-cache.dir")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Directory for local block cache on disk. "
+ + "If not configured, memory cache is used
instead.");
+
+ public static final ConfigOption<MemorySize> LOCAL_CACHE_MAX_SIZE =
+ key("local-cache.max-size")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum total size of the local block cache.
Unlimited by default.");
+
+ public static final ConfigOption<MemorySize> LOCAL_CACHE_BLOCK_SIZE =
+ key("local-cache.block-size")
+ .memoryType()
+ .defaultValue(ofMebiBytes(1))
+ .withDescription("Block size for local cache.");
+
+ public static final ConfigOption<String> LOCAL_CACHE_WHITELIST =
+ key("local-cache.whitelist")
+ .stringType()
+ .defaultValue("meta,global-index")
+ .withDescription(
+ "Comma-separated list of file types to cache. "
+ + "Supported values: meta, global-index,
bucket-index, data, file-index.");
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java
new file mode 100644
index 0000000000..e41d0f04cd
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingFileIO.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.FileType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link FileIO} wrapper that caches reads at block granularity.
+ *
+ * <p>Only file types in the whitelist are cached. Others are read directly
from the delegate.
+ *
+ * <p>After deserialization, the cache is null and reads fall through to the
delegate directly.
+ */
+public class CachingFileIO implements FileIO {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileIO delegate;
+ private final Set<FileType> whitelist;
+
+ private transient volatile LocalCacheManager cache;
+
+ public CachingFileIO(FileIO delegate, LocalCacheManager cache,
Set<FileType> whitelist) {
+ this.delegate = delegate;
+ this.cache = cache;
+ this.whitelist = EnumSet.copyOf(whitelist);
+ }
+
+ /**
+ * Wraps the given {@link FileIO} with caching if local cache is enabled
in the catalog context.
+ *
+ * @param fileIO the FileIO to potentially wrap
+ * @param context the catalog context containing cache configuration
+ * @param cache the cache manager instance (managed by the Catalog)
+ * @return a CachingFileIO if caching is enabled and configured, otherwise
the original FileIO
+ */
+ public static FileIO wrapWithCachingIfNeeded(
+ FileIO fileIO, CatalogContext context, @Nullable LocalCacheManager
cache) {
+ if (fileIO instanceof CachingFileIO) {
+ return fileIO;
+ }
+ if (cache == null) {
+ return fileIO;
+ }
+ Options options = context.options();
+ Set<FileType> whitelist =
+
FileType.parseWhitelist(options.get(CatalogOptions.LOCAL_CACHE_WHITELIST));
+ if (whitelist.isEmpty()) {
+ return fileIO;
+ }
+ return new CachingFileIO(fileIO, cache, whitelist);
+ }
+
+ /**
+ * Creates a {@link LocalCacheManager} from the catalog context options,
or returns null if
+ * caching is not enabled.
+ */
+ @Nullable
+ public static LocalCacheManager createCacheManager(CatalogContext context)
{
+ Options options = context.options();
+ if (!options.get(CatalogOptions.LOCAL_CACHE_ENABLED)) {
+ return null;
+ }
+
+ MemorySize maxSizeOpt =
options.get(CatalogOptions.LOCAL_CACHE_MAX_SIZE);
+ long maxSize = maxSizeOpt == null ? Long.MAX_VALUE :
maxSizeOpt.getBytes();
+ int blockSize = (int)
options.get(CatalogOptions.LOCAL_CACHE_BLOCK_SIZE).getBytes();
+
+ String cacheDir = options.get(CatalogOptions.LOCAL_CACHE_DIR);
+ if (cacheDir != null) {
+ return new LocalDiskCacheManager(cacheDir, maxSize, blockSize);
+ } else {
+ return new LocalMemoryCacheManager(maxSize, blockSize);
+ }
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ LocalCacheManager c = cache;
+ FileType fileType = FileType.classify(path);
+ if (c == null || !whitelist.contains(fileType) ||
FileType.isMutable(path)) {
+ return delegate.newInputStream(path);
+ }
+ return new CachingSeekableInputStream(delegate, path, c);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ return delegate.newOutputStream(path, overwrite);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return delegate.getFileStatus(path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return delegate.listStatus(path);
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return delegate.exists(path);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ return delegate.delete(path, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ return delegate.mkdirs(path);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return delegate.rename(src, dst);
+ }
+
+ @Override
+ public boolean isObjectStore() {
+ return delegate.isObjectStore();
+ }
+
+ @Override
+ public void configure(CatalogContext context) {
+ delegate.configure(context);
+ }
+
+ @Override
+ public void setRuntimeContext(Map<String, String> options) {
+ delegate.setRuntimeContext(options);
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java
new file mode 100644
index 0000000000..b81e98b572
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/CachingSeekableInputStream.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** A {@link SeekableInputStream} that caches reads at block granularity on
local disk. */
+public class CachingSeekableInputStream extends SeekableInputStream {
+
+ private final FileIO fileIO;
+ private final Path path;
+ private final LocalCacheManager cache;
+ private long pos;
+ private long fileSize = -1;
+ @Nullable private SeekableInputStream remoteStream;
+
+ public CachingSeekableInputStream(FileIO fileIO, Path path,
LocalCacheManager cache) {
+ this.fileIO = fileIO;
+ this.path = path;
+ this.cache = cache;
+ this.pos = 0;
+ }
+
+ private long fileSize() throws IOException {
+ if (fileSize == -1) {
+ String pathStr = path.toString();
+ long cached = cache.getFileSize(pathStr);
+ if (cached >= 0) {
+ fileSize = cached;
+ } else {
+ fileSize = fileIO.getFileStatus(path).getLen();
+ cache.putFileSize(pathStr, fileSize);
+ }
+ }
+ return fileSize;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ this.pos = Math.max(0, desired);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (pos >= fileSize()) {
+ return -1;
+ }
+ int blockSize = cache.blockSize();
+ int blockIndex = (int) (pos / blockSize);
+ byte[] blockData = readBlock(blockIndex);
+ int offsetInBlock = (int) (pos - (long) blockIndex * blockSize);
+ pos++;
+ return blockData[offsetInBlock] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (len == 0) {
+ return 0;
+ }
+ if (pos >= fileSize()) {
+ return -1;
+ }
+
+ int blockSize = cache.blockSize();
+ long end = Math.min(pos + len, fileSize());
+ int totalRead = 0;
+
+ while (pos < end) {
+ int blockIndex = (int) (pos / blockSize);
+ byte[] blockData = readBlock(blockIndex);
+
+ long blockStart = (long) blockIndex * blockSize;
+ int startInBlock = (int) (pos - blockStart);
+ int endInBlock = (int) Math.min(end - blockStart,
blockData.length);
+ int bytesToCopy = endInBlock - startInBlock;
+
+ System.arraycopy(blockData, startInBlock, b, off + totalRead,
bytesToCopy);
+ totalRead += bytesToCopy;
+ pos += bytesToCopy;
+ }
+
+ return totalRead;
+ }
+
+ private byte[] readBlock(int blockIndex) throws IOException {
+ byte[] cached = cache.getBlock(path.toString(), blockIndex);
+ if (cached != null) {
+ return cached;
+ }
+
+ int blockSize = cache.blockSize();
+ long offset = (long) blockIndex * blockSize;
+ int readSize = (int) Math.min(blockSize, fileSize() - offset);
+
+ SeekableInputStream stream = getRemoteStream();
+ stream.seek(offset);
+ byte[] data = readFully(stream, readSize);
+
+ cache.putBlock(path.toString(), blockIndex, data);
+ return data;
+ }
+
+ private SeekableInputStream getRemoteStream() throws IOException {
+ if (remoteStream == null) {
+ remoteStream = fileIO.newInputStream(path);
+ }
+ return remoteStream;
+ }
+
+ private static byte[] readFully(SeekableInputStream in, int size) throws
IOException {
+ byte[] buf = new byte[size];
+ int remaining = size;
+ int off = 0;
+ while (remaining > 0) {
+ int n = in.read(buf, off, remaining);
+ if (n < 0) {
+ break;
+ }
+ off += n;
+ remaining -= n;
+ }
+ return buf;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (remoteStream != null) {
+ remoteStream.close();
+ remoteStream = null;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalCacheManager.java
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalCacheManager.java
new file mode 100644
index 0000000000..4929bf5cd8
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalCacheManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import javax.annotation.Nullable;
+
+/** Block-level local cache manager interface for file reads. */
+public interface LocalCacheManager {
+
+ int blockSize();
+
+ @Nullable
+ byte[] getBlock(String filePath, int blockIndex);
+
+ void putBlock(String filePath, int blockIndex, byte[] data);
+
+ /** Returns cached file size, or -1 if not cached. */
+ long getFileSize(String filePath);
+
+ void putFileSize(String filePath, long size);
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalDiskCacheManager.java
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalDiskCacheManager.java
new file mode 100644
index 0000000000..6dfc710e3b
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalDiskCacheManager.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Block-level local disk cache with LRU eviction. Thread-safe. */
+public class LocalDiskCacheManager implements LocalCacheManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalDiskCacheManager.class);
+
+ private final File cacheDir;
+ private final long maxSizeBytes;
+ private final int blockSize;
+ private final Object lock = new Object();
+ private final ConcurrentHashMap<String, Long> fileSizeCache = new
ConcurrentHashMap<>();
+
+ // LRU-ordered index: key -> size. Access order so get() moves entry to
tail.
+ private final LinkedHashMap<String, Long> entryIndex;
+ private long currentSize;
+
+ public LocalDiskCacheManager(String cacheDir, long maxSizeBytes, int
blockSize) {
+ this.cacheDir = new File(cacheDir);
+ this.maxSizeBytes = maxSizeBytes;
+ this.blockSize = blockSize;
+ this.entryIndex = new LinkedHashMap<>(64, 0.75f, true);
+ this.cacheDir.mkdirs();
+ this.currentSize = scanAndPopulateIndex();
+ }
+
+ public int blockSize() {
+ return blockSize;
+ }
+
+ public byte[] getBlock(String filePath, int blockIndex) {
+ File path = cachePath(filePath, blockIndex);
+ String cacheKey = path.getPath();
+ synchronized (lock) {
+ if (!entryIndex.containsKey(cacheKey)) {
+ return null;
+ }
+ // access to update LRU order
+ entryIndex.get(cacheKey);
+ }
+ try {
+ return Files.readAllBytes(path.toPath());
+ } catch (IOException e) {
+ LOG.debug("Failed to read cache block: {}", path, e);
+ synchronized (lock) {
+ Long size = entryIndex.remove(cacheKey);
+ if (size != null) {
+ currentSize -= size;
+ }
+ }
+ return null;
+ }
+ }
+
+ public void putBlock(String filePath, int blockIndex, byte[] data) {
+ File path = cachePath(filePath, blockIndex);
+ String cacheKey = path.getPath();
+
+ synchronized (lock) {
+ if (entryIndex.containsKey(cacheKey)) {
+ return;
+ }
+ }
+
+ File subDir = path.getParentFile();
+ subDir.mkdirs();
+
+ File tmpFile =
+ new File(
+ path.getParent(),
+ path.getName() + ".tmp." +
Thread.currentThread().getId());
+ try {
+ try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
+ fos.write(data);
+ }
+ if (!tmpFile.renameTo(path)) {
+ tmpFile.delete();
+ return;
+ }
+ } catch (IOException e) {
+ tmpFile.delete();
+ LOG.debug("Failed to write cache block: {}", path, e);
+ return;
+ }
+
+ boolean needEvict = false;
+ synchronized (lock) {
+ entryIndex.put(cacheKey, (long) data.length);
+ currentSize += data.length;
+ needEvict = maxSizeBytes < Long.MAX_VALUE && currentSize >
maxSizeBytes;
+ }
+ if (needEvict) {
+ evict();
+ }
+ }
+
+ private void evict() {
+ List<Map.Entry<String, Long>> toDelete = new ArrayList<>();
+ synchronized (lock) {
+ if (currentSize <= maxSizeBytes) {
+ return;
+ }
+ Iterator<Map.Entry<String, Long>> it =
entryIndex.entrySet().iterator();
+ while (it.hasNext() && currentSize > maxSizeBytes) {
+ Map.Entry<String, Long> entry = it.next();
+ toDelete.add(entry);
+ currentSize -= entry.getValue();
+ it.remove();
+ }
+ }
+ for (Map.Entry<String, Long> entry : toDelete) {
+ if (!new File(entry.getKey()).delete()) {
+ synchronized (lock) {
+ entryIndex.put(entry.getKey(), entry.getValue());
+ currentSize += entry.getValue();
+ }
+ }
+ }
+ }
+
+ private long scanAndPopulateIndex() {
+ long total = 0;
+ try {
+ Path root = cacheDir.toPath();
+ if (!Files.exists(root)) {
+ return 0;
+ }
+ Files.walkFileTree(
+ root,
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file,
BasicFileAttributes attrs) {
+ String name = file.getFileName().toString();
+ if (!name.contains(".tmp.")) {
+ entryIndex.put(file.toFile().getPath(),
attrs.size());
+ }
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ for (Long size : entryIndex.values()) {
+ total += size;
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to scan cache directory", e);
+ }
+ return total;
+ }
+
+ private File cachePath(String filePath, int blockIndex) {
+ String key = filePath + ":" + blockIndex;
+ String hex = sha256Hex(key);
+ String prefix = hex.substring(0, 2);
+ return new File(new File(cacheDir, prefix), hex);
+ }
+
+ private static final ThreadLocal<MessageDigest> SHA256_DIGEST =
+ ThreadLocal.withInitial(
+ () -> {
+ try {
+ return MessageDigest.getInstance("SHA-256");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("SHA-256 not
available", e);
+ }
+ });
+
+ private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray();
+
+ private static String sha256Hex(String input) {
+ MessageDigest md = SHA256_DIGEST.get();
+ md.reset();
+ byte[] hash =
md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ char[] chars = new char[hash.length * 2];
+ for (int i = 0; i < hash.length; i++) {
+ chars[i * 2] = HEX_CHARS[(hash[i] >> 4) & 0x0f];
+ chars[i * 2 + 1] = HEX_CHARS[hash[i] & 0x0f];
+ }
+ return new String(chars);
+ }
+
+ @VisibleForTesting
+ long currentSize() {
+ synchronized (lock) {
+ return currentSize;
+ }
+ }
+
+ @Override
+ public long getFileSize(String filePath) {
+ Long size = fileSizeCache.get(filePath);
+ return size != null ? size : -1;
+ }
+
+ @Override
+ public void putFileSize(String filePath, long size) {
+ fileSizeCache.put(filePath, size);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalMemoryCacheManager.java
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalMemoryCacheManager.java
new file mode 100644
index 0000000000..3e7baab21e
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/cache/LocalMemoryCacheManager.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Block-level in-memory cache with LRU eviction. Thread-safe. */
+public class LocalMemoryCacheManager implements LocalCacheManager {
+
+ private final long maxSizeBytes;
+ private final int blockSize;
+ private final Object lock = new Object();
+ private final LinkedHashMap<BlockKey, byte[]> cache;
+ private final ConcurrentHashMap<String, Long> fileSizeCache = new
ConcurrentHashMap<>();
+
+ private long currentSize;
+
+ public LocalMemoryCacheManager(long maxSizeBytes, int blockSize) {
+ this.maxSizeBytes = maxSizeBytes;
+ this.blockSize = blockSize;
+ this.currentSize = 0;
+ this.cache = new LinkedHashMap<>(64, 0.75f, true);
+ }
+
+ @Override
+ public int blockSize() {
+ return blockSize;
+ }
+
+ @Nullable
+ @Override
+ public byte[] getBlock(String filePath, int blockIndex) {
+ BlockKey key = new BlockKey(filePath, blockIndex);
+ synchronized (lock) {
+ return cache.get(key);
+ }
+ }
+
+ @Override
+ public void putBlock(String filePath, int blockIndex, byte[] data) {
+ BlockKey key = new BlockKey(filePath, blockIndex);
+ synchronized (lock) {
+ if (cache.containsKey(key)) {
+ return;
+ }
+ currentSize += data.length;
+ cache.put(key, data);
+ while (maxSizeBytes < Long.MAX_VALUE
+ && currentSize > maxSizeBytes
+ && !cache.isEmpty()) {
+ Iterator<Map.Entry<BlockKey, byte[]>> it =
cache.entrySet().iterator();
+ Map.Entry<BlockKey, byte[]> eldest = it.next();
+ currentSize -= eldest.getValue().length;
+ it.remove();
+ }
+ }
+ }
+
+ @Override
+ public long getFileSize(String filePath) {
+ Long size = fileSizeCache.get(filePath);
+ return size != null ? size : -1;
+ }
+
+ @Override
+ public void putFileSize(String filePath, long size) {
+ fileSizeCache.put(filePath, size);
+ }
+
+ private static class BlockKey {
+ final String filePath;
+ final int blockIndex;
+
+ BlockKey(String filePath, int blockIndex) {
+ this.filePath = filePath;
+ this.blockIndex = blockIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BlockKey)) {
+ return false;
+ }
+ BlockKey that = (BlockKey) o;
+ return blockIndex == that.blockIndex && Objects.equals(filePath,
that.filePath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(filePath, blockIndex);
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
index 7320b495b4..df71b0248a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileType.java
@@ -20,6 +20,12 @@ package org.apache.paimon.utils;
import org.apache.paimon.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.EnumSet;
+import java.util.Set;
+
/**
* Classification of Paimon files.
*
@@ -39,6 +45,8 @@ public enum FileType {
GLOBAL_INDEX,
FILE_INDEX;
+ private static final Logger LOG = LoggerFactory.getLogger(FileType.class);
+
// keep in sync with SnapshotManager.SNAPSHOT_PREFIX
private static final String SNAPSHOT_PREFIX = "snapshot-";
// keep in sync with SchemaManager.SCHEMA_PREFIX
@@ -70,6 +78,46 @@ public enum FileType {
return this == BUCKET_INDEX || this == GLOBAL_INDEX || this ==
FILE_INDEX;
}
+ /** Parse a comma-separated whitelist string into a set of {@link
FileType}s. */
+ public static Set<FileType> parseWhitelist(String whitelist) {
+ Set<FileType> result = EnumSet.noneOf(FileType.class);
+ for (String name : whitelist.split(",")) {
+ name = name.trim();
+ switch (name) {
+ case "meta":
+ result.add(META);
+ break;
+ case "global-index":
+ result.add(GLOBAL_INDEX);
+ break;
+ case "bucket-index":
+ result.add(BUCKET_INDEX);
+ break;
+ case "data":
+ result.add(DATA);
+ break;
+ case "file-index":
+ result.add(FILE_INDEX);
+ break;
+ default:
+ if (!name.isEmpty()) {
+ LOG.warn(
+ "Unknown local-cache.whitelist value '{}'. "
+ + "Supported values: meta,
global-index, bucket-index, data, file-index.",
+ name);
+ }
+ break;
+ }
+ }
+ return result;
+ }
+
+ /** Returns {@code true} if the file is mutable and should not be cached.
*/
+ public static boolean isMutable(Path filePath) {
+ String name = filePath.getName();
+ return "EARLIEST".equals(name) || "LATEST".equals(name);
+ }
+
/**
* Classify a file based on its full path.
*
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java
new file mode 100644
index 0000000000..bb179ed47b
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/CachingFileIOTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.FileType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CachingFileIO} and {@link CachingSeekableInputStream}. */
+class CachingFileIOTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private String cacheDir;
+
+ @BeforeEach
+ void setUp() {
+ cacheDir = tempDir.resolve("cache").toString();
+ }
+
+ private CachingFileIO newCachingFileIO(
+ FileIO delegate, LocalCacheManager cache, EnumSet<FileType>
whitelist, int blockSize) {
+ return new CachingFileIO(delegate, cache, whitelist);
+ }
+
+ @Test
+ void testMetaFileIsCached() throws IOException {
+ byte[] data = "snapshot data".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("snapshot-1", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ // first read
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("snapshot-1"))) {
+ byte[] result = readAll(s, data.length);
+ assertThat(result).isEqualTo(data);
+ }
+
+ // second read should still work (cache hit)
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("snapshot-1"))) {
+ byte[] result = readAll(s, data.length);
+ assertThat(result).isEqualTo(data);
+ }
+
+ assertThat(delegate.getFileStatusCallCount("snapshot-1")).isEqualTo(1);
+ }
+
+ @Test
+ void testManifestFileIsCached() throws IOException {
+ byte[] data = "manifest data".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("manifest-abc", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("manifest-abc"))) {
+ assertThat(readAll(s, data.length)).isEqualTo(data);
+ }
+ }
+
+ @Test
+ void testGlobalIndexFileIsCached() throws IOException {
+ byte[] data = "index data".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("global-index-uuid.index", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ try (SeekableInputStream s =
+ cachingIO.newInputStream(new Path("global-index-uuid.index")))
{
+ assertThat(readAll(s, data.length)).isEqualTo(data);
+ }
+ }
+
+ @Test
+ void testDataFileNotCached() throws IOException {
+ byte[] data = "data content".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("data-abc.orc", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ SeekableInputStream s = cachingIO.newInputStream(new
Path("data-abc.orc"));
+ assertThat(s).isNotInstanceOf(CachingSeekableInputStream.class);
+ byte[] result = readAll(s, data.length);
+ assertThat(result).isEqualTo(data);
+ s.close();
+ // getFileStatus should NOT be called for data files
+
assertThat(delegate.getFileStatusCallCount("data-abc.orc")).isEqualTo(0);
+ }
+
+ @Test
+ void testFileIndexNotCached() throws IOException {
+ byte[] data = "file index content".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("data-abc.orc.index", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ SeekableInputStream s = cachingIO.newInputStream(new
Path("data-abc.orc.index"));
+ assertThat(s).isNotInstanceOf(CachingSeekableInputStream.class);
+ s.close();
+ }
+
+ @Test
+ void testCustomWhitelistMetaOnly() throws IOException {
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("snapshot-1", "snap".getBytes());
+ delegate.addFile("global-index-uuid.index", "idx".getBytes());
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO = newCachingFileIO(delegate, cache,
EnumSet.of(FileType.META), 64);
+
+ SeekableInputStream s1 = cachingIO.newInputStream(new
Path("snapshot-1"));
+ assertThat(s1).isInstanceOf(CachingSeekableInputStream.class);
+ s1.close();
+
+ SeekableInputStream s2 = cachingIO.newInputStream(new
Path("global-index-uuid.index"));
+ assertThat(s2).isNotInstanceOf(CachingSeekableInputStream.class);
+ s2.close();
+ }
+
+ @Test
+ void testBucketIndexCachedWhenInWhitelist() throws IOException {
+ byte[] data = "bucket index".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("index-uuid-0", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate,
+ cache,
+ EnumSet.of(FileType.META, FileType.GLOBAL_INDEX,
FileType.BUCKET_INDEX),
+ 64);
+
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("index-uuid-0"))) {
+ assertThat(s).isInstanceOf(CachingSeekableInputStream.class);
+ assertThat(readAll(s, data.length)).isEqualTo(data);
+ }
+ }
+
+ @Test
+ void testCacheHitAvoidsRemoteRead() throws IOException {
+ byte[] data = "0123456789abcdef".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("snapshot-1", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 8);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ // first read populates cache
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("snapshot-1"))) {
+ readAll(s, data.length);
+ }
+ int firstReadCount = delegate.newInputStreamCallCount("snapshot-1");
+ assertThat(firstReadCount).isEqualTo(1);
+
+ // second read should hit cache — delegate.newInputStream should NOT
be called
+ // because the remote stream is lazily opened and all blocks are cached
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("snapshot-1"))) {
+ byte[] result = readAll(s, data.length);
+ assertThat(result).isEqualTo(data);
+ }
+
assertThat(delegate.newInputStreamCallCount("snapshot-1")).isEqualTo(firstReadCount);
+ }
+
+ @Test
+ void testMutableFilesNotCached() throws IOException {
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("EARLIEST", "1".getBytes());
+ delegate.addFile("LATEST", "42".getBytes());
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ // EARLIEST and LATEST are META but mutable — should not be cached
+ SeekableInputStream s1 = cachingIO.newInputStream(new
Path("EARLIEST"));
+ assertThat(s1).isNotInstanceOf(CachingSeekableInputStream.class);
+ s1.close();
+
+ SeekableInputStream s2 = cachingIO.newInputStream(new Path("LATEST"));
+ assertThat(s2).isNotInstanceOf(CachingSeekableInputStream.class);
+ s2.close();
+
+ assertThat(delegate.getFileStatusCallCount("EARLIEST")).isEqualTo(0);
+ assertThat(delegate.getFileStatusCallCount("LATEST")).isEqualTo(0);
+ }
+
+ @Test
+ void testReadSpanningMultipleBlocks() throws IOException {
+ byte[] data = new byte[1024];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = (byte) (i % 256);
+ }
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("snapshot-1", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 100);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("snapshot-1"))) {
+ // read across block boundary: block 0 ends at 100, block 1 starts
at 100
+ s.seek(90);
+ byte[] result = new byte[30];
+ int read = readFully(s, result);
+ assertThat(read).isEqualTo(30);
+ byte[] expected = new byte[30];
+ System.arraycopy(data, 90, expected, 0, 30);
+ assertThat(result).isEqualTo(expected);
+ }
+ }
+
+ @Test
+ void testSeekAndRead() throws IOException {
+ byte[] data = "0123456789abcdef".getBytes();
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("snapshot-1", data);
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 8);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ try (SeekableInputStream s = cachingIO.newInputStream(new
Path("snapshot-1"))) {
+ s.seek(10);
+ byte[] result = new byte[6];
+ readFully(s, result);
+ assertThat(new String(result)).isEqualTo("abcdef");
+ }
+ }
+
+ @Test
+ void testDelegateMethodsForwarded() throws IOException {
+ MockFileIO delegate = new MockFileIO();
+ delegate.addFile("snapshot-1", "data".getBytes());
+
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ CachingFileIO cachingIO =
+ newCachingFileIO(
+ delegate, cache, EnumSet.of(FileType.META,
FileType.GLOBAL_INDEX), 64);
+
+ assertThat(cachingIO.exists(new Path("snapshot-1"))).isTrue();
+ assertThat(cachingIO.exists(new Path("nonexistent"))).isFalse();
+ assertThat(cachingIO.isObjectStore()).isFalse();
+ }
+
+ private byte[] readAll(SeekableInputStream s, int size) throws IOException
{
+ byte[] buf = new byte[size];
+ int off = 0;
+ while (off < size) {
+ int n = s.read(buf, off, size - off);
+ if (n < 0) {
+ break;
+ }
+ off += n;
+ }
+ return buf;
+ }
+
+ private int readFully(SeekableInputStream s, byte[] buf) throws
IOException {
+ int off = 0;
+ while (off < buf.length) {
+ int n = s.read(buf, off, buf.length - off);
+ if (n < 0) {
+ break;
+ }
+ off += n;
+ }
+ return off;
+ }
+
+ /** Simple in-memory FileIO for testing. */
+ private static class MockFileIO implements FileIO {
+
+ private final Map<String, byte[]> files = new HashMap<>();
+ private final Map<String, Integer> fileStatusCalls = new HashMap<>();
+ private final Map<String, Integer> newInputStreamCalls = new
HashMap<>();
+
+ void addFile(String name, byte[] data) {
+ files.put(name, data);
+ }
+
+ int getFileStatusCallCount(String name) {
+ return fileStatusCalls.getOrDefault(name, 0);
+ }
+
+ int newInputStreamCallCount(String name) {
+ return newInputStreamCalls.getOrDefault(name, 0);
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws
IOException {
+ String name = path.getName();
+ newInputStreamCalls.merge(name, 1, Integer::sum);
+ byte[] data = files.get(name);
+ if (data == null) {
+ throw new IOException("File not found: " + name);
+ }
+ return new ByteArraySeekableInputStream(data);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean
overwrite) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ String name = path.getName();
+ fileStatusCalls.merge(name, 1, Integer::sum);
+ byte[] data = files.get(name);
+ if (data == null) {
+ throw new IOException("File not found: " + name);
+ }
+ return new FileStatus() {
+ @Override
+ public long getLen() {
+ return data.length;
+ }
+
+ @Override
+ public boolean isDir() {
+ return false;
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return 0;
+ }
+ };
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) {
+ return new FileStatus[0];
+ }
+
+ @Override
+ public boolean exists(Path path) {
+ return files.containsKey(path.getName());
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) {
+ return files.remove(path.getName()) != null;
+ }
+
+ @Override
+ public boolean mkdirs(Path path) {
+ return true;
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) {
+ return false;
+ }
+
+ @Override
+ public boolean isObjectStore() {
+ return false;
+ }
+
+ @Override
+ public void configure(CatalogContext context) {}
+ }
+
+ /** SeekableInputStream backed by a byte array. */
+ private static class ByteArraySeekableInputStream extends
SeekableInputStream {
+
+ private final byte[] data;
+ private int pos;
+
+ ByteArraySeekableInputStream(byte[] data) {
+ this.data = data;
+ this.pos = 0;
+ }
+
+ @Override
+ public void seek(long desired) {
+ this.pos = (int) Math.max(0, Math.min(desired, data.length));
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public int read() {
+ if (pos >= data.length) {
+ return -1;
+ }
+ return data[pos++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (pos >= data.length) {
+ return -1;
+ }
+ int toRead = Math.min(len, data.length - pos);
+ System.arraycopy(data, pos, b, off, toRead);
+ pos += toRead;
+ return toRead;
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/cache/LocalDiskCacheManagerTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/LocalDiskCacheManagerTest.java
new file mode 100644
index 0000000000..d6fb136cf5
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/cache/LocalDiskCacheManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs.cache;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link LocalDiskCacheManager}. */
+class LocalDiskCacheManagerTest {
+
+ @TempDir Path tempDir;
+
+ private String cacheDir;
+
+ @BeforeEach
+ void setUp() {
+ cacheDir = tempDir.resolve("cache").toString();
+ }
+
+ @Test
+ void testPutAndGet() {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ byte[] data = "hello world block".getBytes();
+ cache.putBlock("file1.index", 0, data);
+ assertThat(cache.getBlock("file1.index", 0)).isEqualTo(data);
+ }
+
+ @Test
+ void testCacheMiss() {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ assertThat(cache.getBlock("nonexistent", 0)).isNull();
+ }
+
+ @Test
+ void testDifferentKeys() {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ cache.putBlock("file1", 0, "block0".getBytes());
+ cache.putBlock("file1", 1, "block1".getBytes());
+ cache.putBlock("file2", 0, "other0".getBytes());
+
+ assertThat(cache.getBlock("file1", 0)).isEqualTo("block0".getBytes());
+ assertThat(cache.getBlock("file1", 1)).isEqualTo("block1".getBytes());
+ assertThat(cache.getBlock("file2", 0)).isEqualTo("other0".getBytes());
+ }
+
+ @Test
+ void testDuplicatePutIsNoop() {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ cache.putBlock("file1", 0, "original".getBytes());
+ cache.putBlock("file1", 0, "duplicate".getBytes());
+ assertThat(cache.getBlock("file1",
0)).isEqualTo("original".getBytes());
+ }
+
+ @Test
+ void testEviction() {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir, 100,
64);
+ byte[] block0 = new byte[60];
+ byte[] block1 = new byte[60];
+ java.util.Arrays.fill(block0, (byte) 'a');
+ java.util.Arrays.fill(block1, (byte) 'b');
+ cache.putBlock("f", 0, block0);
+ cache.putBlock("f", 1, block1);
+ // total 120 > 100, at least one block should be evicted
+ byte[] remaining0 = cache.getBlock("f", 0);
+ byte[] remaining1 = cache.getBlock("f", 1);
+ assertThat(remaining0 == null || remaining1 == null).isTrue();
+ }
+
+ @Test
+ void testScanSizeOnRestart() {
+ LocalDiskCacheManager cache1 = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ byte[] data0 = new byte[100];
+ byte[] data1 = new byte[200];
+ java.util.Arrays.fill(data0, (byte) 'x');
+ java.util.Arrays.fill(data1, (byte) 'y');
+ cache1.putBlock("f", 0, data0);
+ cache1.putBlock("f", 1, data1);
+
+ LocalDiskCacheManager cache2 = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ assertThat(cache2.currentSize()).isEqualTo(300);
+ assertThat(cache2.getBlock("f", 0)).isEqualTo(data0);
+ assertThat(cache2.getBlock("f", 1)).isEqualTo(data1);
+ }
+
+ @Test
+ void testCacheDirCreated() {
+ String deepDir = tempDir.resolve("sub").resolve("deep").toString();
+ new LocalDiskCacheManager(deepDir, Long.MAX_VALUE, 64);
+ assertThat(new java.io.File(deepDir).isDirectory()).isTrue();
+ }
+
+ @Test
+ void testConcurrentPutGet() throws InterruptedException {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ List<Throwable> errors = new CopyOnWriteArrayList<>();
+
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ final int idx = i;
+ threads.add(
+ new Thread(
+ () -> {
+ try {
+ byte[] data = new byte[100];
+ java.util.Arrays.fill(data, (byte) (idx %
256));
+ cache.putBlock("concurrent", idx, data);
+ } catch (Throwable e) {
+ errors.add(e);
+ }
+ }));
+ threads.add(
+ new Thread(
+ () -> {
+ try {
+ byte[] result =
cache.getBlock("concurrent", idx);
+ if (result != null) {
+ byte[] expected = new byte[100];
+ java.util.Arrays.fill(expected, (byte)
(idx % 256));
+ assertThat(result).isEqualTo(expected);
+ }
+ } catch (Throwable e) {
+ errors.add(e);
+ }
+ }));
+ }
+ for (Thread t : threads) {
+ t.start();
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ assertThat(errors).isEmpty();
+ }
+
+ @Test
+ void testUnlimitedCacheSkipsEviction() {
+ LocalDiskCacheManager cache = new LocalDiskCacheManager(cacheDir,
Long.MAX_VALUE, 64);
+ for (int i = 0; i < 50; i++) {
+ byte[] data = new byte[100];
+ java.util.Arrays.fill(data, (byte) 'x');
+ cache.putBlock("f", i, data);
+ }
+ for (int i = 0; i < 50; i++) {
+ assertThat(cache.getBlock("f", i)).isNotNull();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4130c8d575..9512e2f767 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -25,6 +25,8 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.cache.CachingFileIO;
+import org.apache.paimon.fs.cache.LocalCacheManager;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
@@ -83,15 +85,18 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final CatalogContext context;
+ protected final @Nullable LocalCacheManager cacheManager;
protected AbstractCatalog(FileIO fileIO) {
this.fileIO = fileIO;
this.tableDefaultOptions = new HashMap<>();
this.context = CatalogContext.create(new Options());
+ this.cacheManager = null;
}
protected AbstractCatalog(FileIO fileIO, CatalogContext context) {
- this.fileIO = fileIO;
+ this.cacheManager = CachingFileIO.createCacheManager(context);
+ this.fileIO = CachingFileIO.wrapWithCachingIfNeeded(fileIO, context,
cacheManager);
this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(context.options().toMap());
this.context = context;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 155d6a9893..94442d5c30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -35,6 +35,8 @@ import org.apache.paimon.consumer.ConsumerInfo;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.fs.cache.CachingFileIO;
+import org.apache.paimon.fs.cache.LocalCacheManager;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
@@ -100,6 +102,7 @@ public class RESTCatalog implements Catalog {
private final CatalogContext context;
private final boolean dataTokenEnabled;
protected final Map<String, String> tableDefaultOptions;
+ private final @Nullable LocalCacheManager cacheManager;
public RESTCatalog(CatalogContext context) {
this(context, true);
@@ -115,6 +118,7 @@ public class RESTCatalog implements Catalog {
context.fallbackIO());
this.dataTokenEnabled =
api.options().get(RESTTokenFileIO.DATA_TOKEN_ENABLED);
this.tableDefaultOptions =
CatalogUtils.tableDefaultOptions(this.context.options().toMap());
+ this.cacheManager = CachingFileIO.createCacheManager(this.context);
}
@Override
@@ -1192,9 +1196,12 @@ public class RESTCatalog implements Catalog {
}
private FileIO fileIOForData(Path path, Identifier identifier) {
- return dataTokenEnabled
- ? new RESTTokenFileIO(context, api, identifier, path)
- : fileIOFromOptions(path);
+ return CachingFileIO.wrapWithCachingIfNeeded(
+ dataTokenEnabled
+ ? new RESTTokenFileIO(context, api, identifier, path)
+ : fileIOFromOptions(path),
+ context,
+ cacheManager);
}
private FileIO fileIOFromOptions(Path path) {
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 478c3233a8..56fe394483 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -19,6 +19,7 @@ from typing import List, Optional, Union
from pypaimon.api.api_response import GetTagResponse, PagedList
from pypaimon.catalog.catalog import Catalog
+from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.catalog.catalog_exception import (
BranchAlreadyExistException,
@@ -36,6 +37,7 @@ from pypaimon.common.options.config import CatalogOptions
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
+from pypaimon.filesystem.caching_file_io import CachingFileIO
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.snapshot.snapshot import Snapshot
@@ -50,7 +52,11 @@ class FileSystemCatalog(Catalog):
raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE.key()}' path
must be set")
self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
self.catalog_options = catalog_options
- self.file_io = FileIO.get(self.warehouse, self.catalog_options)
+ self.catalog_context =
CatalogContext.create_from_options(catalog_options)
+ self._cache_manager =
CachingFileIO.create_cache_manager(self.catalog_options)
+ self.file_io = CachingFileIO.wrap_with_caching_if_needed(
+ FileIO.get(self.warehouse, self.catalog_options),
self.catalog_options,
+ self._cache_manager)
def list_databases(self) -> list:
statuses = self.file_io.list_status(self.warehouse)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 45e3f18b44..a3d6a665ff 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -41,6 +41,7 @@ from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.options.config import CatalogOptions, FuseOptions
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
+from pypaimon.filesystem.caching_file_io import CachingFileIO
from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
@@ -66,6 +67,7 @@ class RESTCatalog(Catalog):
self.context = CatalogContext.create(self.rest_api.options,
context.hadoop_conf,
context.prefer_io_loader,
context.fallback_io_loader)
self.data_token_enabled =
self.rest_api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)
+ self._cache_manager =
CachingFileIO.create_cache_manager(self.context.options)
# FUSE support (lazy import only when enabled)
self.fuse_enabled = self.context.options.get(FuseOptions.FUSE_ENABLED,
False)
@@ -614,7 +616,9 @@ class RESTCatalog(Catalog):
)
def file_io_from_options(self, table_path: str) -> FileIO:
- return FileIO.get(table_path, self.context.options)
+ return CachingFileIO.wrap_with_caching_if_needed(
+ FileIO.get(table_path, self.context.options), self.context.options,
+ self._cache_manager)
def file_io_for_data(self, table_path: str, identifier: Identifier):
"""
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 52a5737002..7d9a227e4a 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -398,6 +398,50 @@ class CoreOptions:
)
)
+ LOCAL_CACHE_ENABLED: ConfigOption[bool] = (
+ ConfigOptions.key("local-cache.enabled")
+ .boolean_type()
+ .default_value(False)
+ .with_description(
+ "Whether to enable local block cache for file reads. "
+ "If local-cache.dir is configured, disk cache is used; otherwise
memory cache is used."
+ )
+ )
+
+ LOCAL_CACHE_DIR: ConfigOption[str] = (
+ ConfigOptions.key("local-cache.dir")
+ .string_type()
+ .no_default_value()
+ .with_description(
+ "Directory for local block cache on disk. "
+ "If not configured, memory cache is used instead."
+ )
+ )
+
+ LOCAL_CACHE_MAX_SIZE: ConfigOption[MemorySize] = (
+ ConfigOptions.key("local-cache.max-size")
+ .memory_type()
+ .no_default_value()
+ .with_description("Maximum total size of the local block cache.
Unlimited by default.")
+ )
+
+ LOCAL_CACHE_BLOCK_SIZE: ConfigOption[MemorySize] = (
+ ConfigOptions.key("local-cache.block-size")
+ .memory_type()
+ .default_value(MemorySize.of_mebi_bytes(1))
+ .with_description("Block size for local cache.")
+ )
+
+ LOCAL_CACHE_WHITELIST: ConfigOption[str] = (
+ ConfigOptions.key("local-cache.whitelist")
+ .string_type()
+ .default_value("meta,global-index")
+ .with_description(
+ "Comma-separated list of file types to cache. "
+ "Supported values: meta, global-index, bucket-index, data,
file-index."
+ )
+ )
+
READ_BATCH_SIZE: ConfigOption[int] = (
ConfigOptions.key("read.batch-size")
.int_type()
@@ -640,6 +684,21 @@ class CoreOptions:
def global_index_thread_num(self) -> Optional[int]:
return self.options.get(CoreOptions.GLOBAL_INDEX_THREAD_NUM)
+ def local_cache_enabled(self) -> bool:
+ return self.options.get(CoreOptions.LOCAL_CACHE_ENABLED)
+
+ def local_cache_dir(self) -> Optional[str]:
+ return self.options.get(CoreOptions.LOCAL_CACHE_DIR)
+
+ def local_cache_max_size(self) -> Optional[MemorySize]:
+ return self.options.get(CoreOptions.LOCAL_CACHE_MAX_SIZE)
+
+ def local_cache_block_size(self) -> MemorySize:
+ return self.options.get(CoreOptions.LOCAL_CACHE_BLOCK_SIZE)
+
+ def local_cache_whitelist(self) -> str:
+ return self.options.get(CoreOptions.LOCAL_CACHE_WHITELIST)
+
def read_batch_size(self, default=None) -> int:
return self.options.get(CoreOptions.READ_BATCH_SIZE, default or 1024)
diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py
b/paimon-python/pypaimon/filesystem/caching_file_io.py
new file mode 100644
index 0000000000..f5141a76e3
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/caching_file_io.py
@@ -0,0 +1,389 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Block-level local cache for Paimon files.
+
+Provides a CachingFileIO wrapper that transparently caches remote file reads
+at block granularity. If a cache directory is configured, disk cache is used;
+otherwise an in-memory LRU cache is used. Files are classified by FileType and
+only cacheable types in the whitelist are cached; others are read directly from
+the delegate FileIO.
+"""
+
+import hashlib
+import os
+import threading
+from collections import OrderedDict
+from typing import Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.utils.file_type import FileType
+
+
+class LocalMemoryCacheManager:
+ """Block-level in-memory cache with LRU eviction."""
+
+ def __init__(self, max_size_bytes: int, block_size: int = 1 * 1024 * 1024):
+ self._max_size_bytes = max_size_bytes
+ self._block_size = block_size
+ self._lock = threading.Lock()
+ self._current_size = 0
+ self._cache: OrderedDict = OrderedDict()
+ self._file_size_cache: dict = {}
+
+ @property
+ def block_size(self) -> int:
+ return self._block_size
+
+ def get_block(self, file_path: str, block_index: int) -> Optional[bytes]:
+ key = (file_path, block_index)
+ with self._lock:
+ data = self._cache.get(key)
+ if data is not None:
+ self._cache.move_to_end(key)
+ return data
+
+ def put_block(self, file_path: str, block_index: int, data: bytes) -> None:
+ key = (file_path, block_index)
+ with self._lock:
+ if key in self._cache:
+ return
+ self._current_size += len(data)
+ self._cache[key] = data
+ while (self._max_size_bytes < (2 ** 63 - 1)
+ and self._current_size > self._max_size_bytes
+ and self._cache):
+ _, evicted = self._cache.popitem(last=False)
+ self._current_size -= len(evicted)
+
+ def get_file_size(self, file_path: str) -> int:
+ return self._file_size_cache.get(file_path, -1)
+
+ def put_file_size(self, file_path: str, size: int) -> None:
+ self._file_size_cache[file_path] = size
+
+
+class LocalDiskCacheManager:
+ """Block-level local disk cache with LRU eviction."""
+
+ def __init__(self, cache_dir: str, max_size_bytes: int,
+ block_size: int = 1 * 1024 * 1024):
+ self._cache_dir = cache_dir
+ self._max_size_bytes = max_size_bytes
+ self._block_size = block_size
+ self._lock = threading.Lock()
+ self._current_size = 0
+ self._file_size_cache: dict = {}
+ # LRU-ordered index: cache_path -> size. OrderedDict with move_to_end
for access order.
+ self._entry_index: OrderedDict = OrderedDict()
+ os.makedirs(cache_dir, exist_ok=True)
+ self._current_size = self._scan_and_populate_index()
+
+ @property
+ def block_size(self) -> int:
+ return self._block_size
+
+ def _cache_path(self, file_path: str, block_index: int) -> str:
+ key = f"{file_path}:{block_index}"
+ h = hashlib.sha256(key.encode('utf-8')).hexdigest()
+ prefix = h[:2]
+ sub_dir = os.path.join(self._cache_dir, prefix)
+ return os.path.join(sub_dir, h)
+
+ def get_block(self, file_path: str, block_index: int) -> Optional[bytes]:
+ path = self._cache_path(file_path, block_index)
+ with self._lock:
+ if path not in self._entry_index:
+ return None
+ self._entry_index.move_to_end(path)
+ try:
+ with open(path, 'rb') as f:
+ return f.read()
+ except (FileNotFoundError, OSError):
+ with self._lock:
+ size = self._entry_index.pop(path, None)
+ if size is not None:
+ self._current_size -= size
+ return None
+
+ def put_block(self, file_path: str, block_index: int, data: bytes) -> None:
+ path = self._cache_path(file_path, block_index)
+
+ with self._lock:
+ if path in self._entry_index:
+ return
+
+ sub_dir = os.path.dirname(path)
+ os.makedirs(sub_dir, exist_ok=True)
+
+ tmp_path = path + f".tmp.{os.getpid()}.{threading.get_ident()}"
+ try:
+ with open(tmp_path, 'wb') as f:
+ f.write(data)
+ os.rename(tmp_path, path)
+ except Exception:
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+ return
+
+ need_evict = False
+ with self._lock:
+ self._entry_index[path] = len(data)
+ self._current_size += len(data)
+ need_evict = (self._max_size_bytes < (2 ** 63 - 1)
+ and self._current_size > self._max_size_bytes)
+ if need_evict:
+ self._evict()
+
+ def _evict(self) -> None:
+ to_delete = []
+ with self._lock:
+ if self._current_size <= self._max_size_bytes:
+ return
+ while self._entry_index and self._current_size >
self._max_size_bytes:
+ path, size = self._entry_index.popitem(last=False)
+ self._current_size -= size
+ to_delete.append((path, size))
+
+ for path, size in to_delete:
+ try:
+ os.unlink(path)
+ except OSError:
+ with self._lock:
+ self._entry_index[path] = size
+ self._current_size += size
+
+ def _scan_and_populate_index(self) -> int:
+ total = 0
+ for dirpath, _, filenames in os.walk(self._cache_dir):
+ for fn in filenames:
+ if '.tmp.' in fn:
+ continue
+ fp = os.path.join(dirpath, fn)
+ try:
+ size = os.path.getsize(fp)
+ self._entry_index[fp] = size
+ total += size
+ except OSError:
+ pass
+ return total
+
+ def get_file_size(self, file_path: str) -> int:
+ return self._file_size_cache.get(file_path, -1)
+
+ def put_file_size(self, file_path: str, size: int) -> None:
+ self._file_size_cache[file_path] = size
+
+
+class CachingInputStream:
+ """Wraps a remote stream with block-level caching."""
+
+ def __init__(self, file_io, file_path: str, cache):
+ self._file_io = file_io
+ self._stream = None
+ self._file_path = file_path
+ self._file_size = -1
+ self._cache = cache
+ self._pos = 0
+
+ def _get_file_size(self) -> int:
+ if self._file_size == -1:
+ cached = self._cache.get_file_size(self._file_path)
+ if cached >= 0:
+ self._file_size = cached
+ else:
+ self._file_size = self._file_io.get_file_size(self._file_path)
+ self._cache.put_file_size(self._file_path, self._file_size)
+ return self._file_size
+
+ def seek(self, offset, whence=0):
+ if whence == 0:
+ self._pos = max(0, offset)
+ elif whence == 1:
+ self._pos = max(0, self._pos + offset)
+ elif whence == 2:
+ self._pos = max(0, self._get_file_size() + offset)
+ return self._pos
+
+ def tell(self) -> int:
+ return self._pos
+
+ def read(self, size=-1) -> bytes:
+ if size == -1 or size is None:
+ size = self._get_file_size() - self._pos
+ if size <= 0 or self._pos >= self._get_file_size():
+ return b''
+
+ end = min(self._pos + size, self._get_file_size())
+ block_size = self._cache.block_size
+
+ first_block = self._pos // block_size
+ last_block = (end - 1) // block_size
+
+ result = bytearray()
+ for bi in range(first_block, last_block + 1):
+ block_data = self._read_block(bi)
+
+ block_start = bi * block_size
+ start_in_block = max(self._pos - block_start, 0)
+ end_in_block = min(end - block_start, len(block_data))
+ result.extend(block_data[start_in_block:end_in_block])
+
+ self._pos = end
+ return bytes(result)
+
+ def _read_block(self, block_index: int) -> bytes:
+ cached = self._cache.get_block(self._file_path, block_index)
+ if cached is not None:
+ return cached
+
+ block_size = self._cache.block_size
+ offset = block_index * block_size
+ read_size = min(block_size, self._get_file_size() - offset)
+
+ stream = self._get_remote_stream()
+ stream.seek(offset)
+ data = self._read_fully(stream, read_size)
+
+ self._cache.put_block(self._file_path, block_index, data)
+ return data
+
+ def _read_fully(self, stream, size: int) -> bytes:
+ buf = bytearray()
+ remaining = size
+ while remaining > 0:
+ chunk = stream.read(remaining)
+ if not chunk:
+ break
+ buf.extend(chunk)
+ remaining -= len(chunk)
+ return bytes(buf)
+
+ def _get_remote_stream(self):
+ if self._stream is None:
+ self._stream = self._file_io.new_input_stream(self._file_path)
+ return self._stream
+
+ def close(self):
+ if self._stream is not None:
+ self._stream.close()
+ self._stream = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+ return False
+
+
+class CachingFileIO(FileIO):
+ """FileIO wrapper that caches reads at block granularity.
+
+ Only file types in the whitelist are cached. Others are read directly
+ from the delegate. After pickling/unpickling, the cache is None and reads
+ fall through to the delegate directly.
+ """
+
+ def __init__(self, delegate: FileIO, cache, whitelist=None):
+ self._delegate = delegate
+ self._cache = cache
+ if whitelist is None:
+ self._whitelist = {FileType.META, FileType.GLOBAL_INDEX}
+ else:
+ self._whitelist = whitelist
+
+ @staticmethod
+ def create_cache_manager(options):
+ """Creates a cache manager from options, or returns None if caching is
not enabled."""
+ from pypaimon.common.options.core_options import CoreOptions
+ opts = CoreOptions(options)
+ if not opts.local_cache_enabled():
+ return None
+ cache_dir = opts.local_cache_dir()
+ max_size_opt = opts.local_cache_max_size()
+ max_size = max_size_opt.get_bytes() if max_size_opt is not None else
(2 ** 63 - 1)
+ block_size = opts.local_cache_block_size().get_bytes()
+ if cache_dir is not None:
+ return LocalDiskCacheManager(cache_dir, max_size, block_size)
+ else:
+ return LocalMemoryCacheManager(max_size, block_size)
+
+ @staticmethod
+ def wrap_with_caching_if_needed(file_io, options, cache=None):
+ """Wraps the given FileIO with caching if local cache is enabled.
+
+ Args:
+ file_io: the FileIO to potentially wrap
+ options: an Options object containing cache configuration
+ cache: the cache manager instance (managed by the caller)
+
+ Returns:
+ a CachingFileIO if caching is enabled and configured, otherwise
the original FileIO
+ """
+ if isinstance(file_io, CachingFileIO):
+ return file_io
+ if cache is None:
+ return file_io
+ from pypaimon.common.options.core_options import CoreOptions
+ opts = CoreOptions(options)
+ whitelist = FileType.parse_whitelist(opts.local_cache_whitelist())
+ if not whitelist:
+ return file_io
+ return CachingFileIO(file_io, cache, whitelist)
+
+ def new_input_stream(self, path: str):
+ file_type = FileType.classify(path)
+ if self._cache is None or file_type not in self._whitelist or
FileType.is_mutable(path):
+ return self._delegate.new_input_stream(path)
+ return CachingInputStream(self._delegate, path, self._cache)
+
+ def new_output_stream(self, path: str):
+ return self._delegate.new_output_stream(path)
+
+ def get_file_status(self, path: str):
+ return self._delegate.get_file_status(path)
+
+ def list_status(self, path: str):
+ return self._delegate.list_status(path)
+
+ def exists(self, path: str) -> bool:
+ return self._delegate.exists(path)
+
+ def delete(self, path: str, recursive: bool = False) -> bool:
+ return self._delegate.delete(path, recursive)
+
+ def mkdirs(self, path: str) -> bool:
+ return self._delegate.mkdirs(path)
+
+ def rename(self, src: str, dst: str) -> bool:
+ return self._delegate.rename(src, dst)
+
+ def get_file_size(self, path: str) -> int:
+ return self._delegate.get_file_size(path)
+
+ def is_dir(self, path: str) -> bool:
+ return self._delegate.is_dir(path)
+
+ def __getattr__(self, name):
+ return getattr(self._delegate, name)
+
+ def close(self):
+ self._delegate.close()
diff --git a/paimon-python/pypaimon/tests/caching_file_io_test.py
b/paimon-python/pypaimon/tests/caching_file_io_test.py
new file mode 100644
index 0000000000..048f70589d
--- /dev/null
+++ b/paimon-python/pypaimon/tests/caching_file_io_test.py
@@ -0,0 +1,421 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for LocalDiskCacheManager, CachingInputStream, and CachingFileIO."""
+
+import io
+import os
+import shutil
+import tempfile
+import threading
+import unittest
+from unittest.mock import MagicMock
+
+from pypaimon.filesystem.caching_file_io import (
+ LocalDiskCacheManager,
+ CachingFileIO,
+ CachingInputStream,
+)
+
+
+class LocalDiskCacheManagerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+ def test_put_and_get(self):
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ data = b"hello world block"
+ cache.put_block("file1.index", 0, data)
+ result = cache.get_block("file1.index", 0)
+ self.assertEqual(data, result)
+
+ def test_cache_miss(self):
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ result = cache.get_block("nonexistent", 0)
+ self.assertIsNone(result)
+
+ def test_different_keys(self):
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ cache.put_block("file1", 0, b"block0")
+ cache.put_block("file1", 1, b"block1")
+ cache.put_block("file2", 0, b"other0")
+
+ self.assertEqual(b"block0", cache.get_block("file1", 0))
+ self.assertEqual(b"block1", cache.get_block("file1", 1))
+ self.assertEqual(b"other0", cache.get_block("file2", 0))
+
+ def test_duplicate_put_is_noop(self):
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ cache.put_block("file1", 0, b"original")
+ cache.put_block("file1", 0, b"duplicate")
+ self.assertEqual(b"original", cache.get_block("file1", 0))
+
+ def test_eviction(self):
+ cache = LocalDiskCacheManager(self.cache_dir, max_size_bytes=100,
block_size=64)
+ cache.put_block("f", 0, b"a" * 60)
+ cache.put_block("f", 1, b"b" * 60)
+ # Total 120 > 100, at least one block should be evicted
+ remaining_0 = cache.get_block("f", 0)
+ remaining_1 = cache.get_block("f", 1)
+ self.assertTrue(remaining_0 is None or remaining_1 is None)
+
+ def test_scan_size_on_restart(self):
+ cache1 = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ cache1.put_block("f", 0, b"x" * 100)
+ cache1.put_block("f", 1, b"y" * 200)
+
+ # Simulate restart: new cache instance on same directory
+ cache2 = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ self.assertEqual(300, cache2._current_size)
+ self.assertEqual(b"x" * 100, cache2.get_block("f", 0))
+ self.assertEqual(b"y" * 200, cache2.get_block("f", 1))
+
+ def test_cache_dir_created(self):
+ new_dir = os.path.join(self.cache_dir, "sub", "deep")
+ LocalDiskCacheManager(new_dir, 2 ** 63 - 1, block_size=64)
+ self.assertTrue(os.path.isdir(new_dir))
+
+ def test_concurrent_put_get(self):
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ errors = []
+
+ def writer(idx):
+ try:
+ data = bytes([idx % 256]) * 100
+ cache.put_block("concurrent", idx, data)
+ except Exception as e:
+ errors.append(e)
+
+ def reader(idx):
+ try:
+ result = cache.get_block("concurrent", idx)
+ if result is not None:
+ expected = bytes([idx % 256]) * 100
+ assert result == expected, f"Mismatch at {idx}"
+ except Exception as e:
+ errors.append(e)
+
+ threads = []
+ for i in range(20):
+ threads.append(threading.Thread(target=writer, args=(i,)))
+ threads.append(threading.Thread(target=reader, args=(i,)))
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+
+ self.assertEqual([], errors)
+
+ def test_unlimited_cache_skips_eviction(self):
+ cache = LocalDiskCacheManager(self.cache_dir, max_size_bytes=2 ** 63 -
1, block_size=64)
+ for i in range(50):
+ cache.put_block("f", i, b"x" * 100)
+ # All 50 blocks should still be present
+ for i in range(50):
+ self.assertIsNotNone(cache.get_block("f", i))
+
+
+class CachingInputStreamTest(unittest.TestCase):
+
+ def setUp(self):
+ self.cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+ def _make_file_io(self, data, path="test"):
+ file_io = MagicMock()
+ file_io.new_input_stream.side_effect = lambda p: io.BytesIO(data)
+ file_io.get_file_size.side_effect = lambda p: len(data)
+ return file_io
+
+ def test_read_entire_file(self):
+ data = b"abcdefghijklmnop"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ result = stream.read()
+ self.assertEqual(data, result)
+
+ def test_read_with_size(self):
+ data = b"abcdefghijklmnop"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ self.assertEqual(b"abcde", stream.read(5))
+ self.assertEqual(b"fghij", stream.read(5))
+ self.assertEqual(b"klmnop", stream.read(10))
+
+ def test_seek_and_read(self):
+ data = b"0123456789abcdef"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ stream.seek(10)
+ self.assertEqual(b"abcdef", stream.read())
+
+ def test_seek_whence_1(self):
+ data = b"0123456789"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ stream.read(3)
+ stream.seek(2, 1) # relative
+ self.assertEqual(5, stream.tell())
+ self.assertEqual(b"56789", stream.read())
+
+ def test_seek_whence_2(self):
+ data = b"0123456789"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ stream.seek(-3, 2) # from end
+ self.assertEqual(b"789", stream.read())
+
+ def test_read_spanning_multiple_blocks(self):
+ data = bytes(range(256)) * 4 # 1024 bytes
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=100)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ # Read across block boundary: block 0 ends at 100, block 1 starts at
100
+ stream.seek(90)
+ result = stream.read(30) # 90..120, spans blocks 0 and 1
+ self.assertEqual(data[90:120], result)
+
+ def test_cache_hit_avoids_remote_read(self):
+ data = b"0123456789abcdef"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+
+ # First read populates cache
+ file_io1 = self._make_file_io(data)
+ stream1 = CachingInputStream(file_io1, "test", cache)
+ stream1.read()
+
+ # Second stream: remote should not be called because all blocks are
cached
+ file_io2 = MagicMock()
+ file_io2.get_file_size.side_effect = lambda p: len(data)
+ stream2 = CachingInputStream(file_io2, "test", cache)
+ result = stream2.read()
+ self.assertEqual(data, result)
+ file_io2.new_input_stream.assert_not_called()
+
+ def test_read_empty(self):
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(b""), "test", cache)
+ self.assertEqual(b"", stream.read())
+
+ def test_read_at_eof(self):
+ data = b"abc"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ stream.read()
+ self.assertEqual(b"", stream.read(10))
+
+ def test_context_manager(self):
+ data = b"test"
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ file_io = self._make_file_io(data)
+ with CachingInputStream(file_io, "test", cache):
+ pass
+
+ def test_partial_last_block(self):
+ data = b"abcdefghij" # 10 bytes, block_size=8 -> block0=8, block1=2
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=8)
+ stream = CachingInputStream(self._make_file_io(data), "test", cache)
+ result = stream.read()
+ self.assertEqual(data, result)
+ # Verify block 1 is only 2 bytes
+ self.assertEqual(b"ij", cache.get_block("test", 1))
+
+
+class CachingFileIOTest(unittest.TestCase):
+
+ def setUp(self):
+ self.cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.cache_dir, ignore_errors=True)
+
+ def _make_delegate(self, path_to_data):
+ delegate = MagicMock()
+
+ def new_input_stream(path):
+ return io.BytesIO(path_to_data.get(path, b""))
+
+ def get_file_size(path):
+ return len(path_to_data.get(path, b""))
+
+ delegate.new_input_stream.side_effect = new_input_stream
+ delegate.get_file_size.side_effect = get_file_size
+ return delegate
+
+ def test_meta_file_is_cached(self):
+ data = b"snapshot data"
+ delegate = self._make_delegate({"snapshot-1": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ # First read
+ with caching_io.new_input_stream("snapshot-1") as s:
+ result1 = s.read()
+ self.assertEqual(data, result1)
+
+ # Second read should hit cache (returns CachingInputStream)
+ with caching_io.new_input_stream("snapshot-1") as s:
+ result2 = s.read()
+ self.assertEqual(data, result2)
+ # get_file_size called only once — second stream gets size from cache
+ self.assertEqual(1, delegate.get_file_size.call_count)
+
+ def test_manifest_file_is_cached(self):
+ data = b"manifest data"
+ delegate = self._make_delegate({"manifest-abc": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ with caching_io.new_input_stream("manifest-abc") as s:
+ self.assertEqual(data, s.read())
+
+ def test_global_index_file_is_cached(self):
+ data = b"index data"
+ delegate = self._make_delegate({"global-index-uuid.index": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ with caching_io.new_input_stream("global-index-uuid.index") as s:
+ self.assertEqual(data, s.read())
+
+ def test_bucket_index_file_not_cached_by_default(self):
+ data = b"bucket index"
+ delegate = self._make_delegate({"index-uuid-0": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ result = caching_io.new_input_stream("index-uuid-0")
+ self.assertNotIsInstance(result, CachingInputStream)
+ self.assertEqual(data, result.read())
+ delegate.get_file_size.assert_not_called()
+
+ def test_bucket_index_cached_when_in_whitelist(self):
+ from pypaimon.utils.file_type import FileType
+ data = b"bucket index"
+ delegate = self._make_delegate({"index-uuid-0": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ whitelist = {FileType.META, FileType.GLOBAL_INDEX,
FileType.BUCKET_INDEX}
+ caching_io = CachingFileIO(delegate, cache, whitelist)
+
+ with caching_io.new_input_stream("index-uuid-0") as s:
+ self.assertEqual(data, s.read())
+
+ def test_custom_whitelist_meta_only(self):
+ from pypaimon.utils.file_type import FileType
+ delegate = self._make_delegate({
+ "snapshot-1": b"snap",
+ "global-index-uuid.index": b"idx",
+ })
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache, {FileType.META})
+
+ with caching_io.new_input_stream("snapshot-1") as s:
+ self.assertIsInstance(s, CachingInputStream)
+
+ result = caching_io.new_input_stream("global-index-uuid.index")
+ self.assertNotIsInstance(result, CachingInputStream)
+
+ def test_data_file_not_cached(self):
+ data = b"data content"
+ delegate = self._make_delegate({"data-abc.orc": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ result = caching_io.new_input_stream("data-abc.orc")
+ # Should be raw BytesIO, not CachingInputStream
+ self.assertNotIsInstance(result, CachingInputStream)
+ self.assertEqual(data, result.read())
+ # get_file_size should NOT be called for data files
+ delegate.get_file_size.assert_not_called()
+
+ def test_file_index_not_cached(self):
+ data = b"file index content"
+ delegate = self._make_delegate({"data-abc.orc.index": data})
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ result = caching_io.new_input_stream("data-abc.orc.index")
+ self.assertNotIsInstance(result, CachingInputStream)
+ self.assertEqual(data, result.read())
+ delegate.get_file_size.assert_not_called()
+
+ def test_delegate_methods_forwarded(self):
+ delegate = MagicMock()
+ cache = LocalDiskCacheManager(self.cache_dir, 2 ** 63 - 1,
block_size=64)
+ caching_io = CachingFileIO(delegate, cache)
+
+ caching_io.exists("/some/path")
+ delegate.exists.assert_called_once_with("/some/path")
+
+ caching_io.mkdirs("/some/dir")
+ delegate.mkdirs.assert_called_once_with("/some/dir")
+
+ caching_io.delete("/some/file", recursive=True)
+ delegate.delete.assert_called_once_with("/some/file", True)
+
+ caching_io.rename("/src", "/dst")
+ delegate.rename.assert_called_once_with("/src", "/dst")
+
+ caching_io.get_file_status("/path")
+ delegate.get_file_status.assert_called_once_with("/path")
+
+ caching_io.list_status("/dir")
+ delegate.list_status.assert_called_once_with("/dir")
+
+ caching_io.new_output_stream("/out")
+ delegate.new_output_stream.assert_called_once_with("/out")
+
+
+class ConfigOptionsTest(unittest.TestCase):
+
+ def test_local_cache_options_defaults(self):
+ from pypaimon.common.options import Options
+ from pypaimon.common.options.core_options import CoreOptions
+
+ opts = CoreOptions(Options({}))
+ self.assertFalse(opts.local_cache_enabled())
+ self.assertIsNone(opts.local_cache_dir())
+ self.assertIsNone(opts.local_cache_max_size())
+ self.assertEqual(1 * 1024 * 1024,
opts.local_cache_block_size().get_bytes())
+ self.assertEqual("meta,global-index", opts.local_cache_whitelist())
+
+ def test_local_cache_options_custom(self):
+ from pypaimon.common.options import Options
+ from pypaimon.common.options.core_options import CoreOptions
+
+ opts = CoreOptions(Options({
+ "local-cache.enabled": "true",
+ "local-cache.dir": "/custom/cache",
+ "local-cache.max-size": "2gb",
+ "local-cache.block-size": "4mb",
+ "local-cache.whitelist": "meta,global-index,bucket-index",
+ }))
+ self.assertTrue(opts.local_cache_enabled())
+ self.assertEqual("/custom/cache", opts.local_cache_dir())
+ self.assertEqual(2 * 1024 * 1024 * 1024,
opts.local_cache_max_size().get_bytes())
+ self.assertEqual(4 * 1024 * 1024,
opts.local_cache_block_size().get_bytes())
+ self.assertEqual("meta,global-index,bucket-index",
opts.local_cache_whitelist())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/file_type_test.py
b/paimon-python/pypaimon/tests/file_type_test.py
new file mode 100644
index 0000000000..b82604eaed
--- /dev/null
+++ b/paimon-python/pypaimon/tests/file_type_test.py
@@ -0,0 +1,109 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Tests for FileType classification."""
+
+import unittest
+
+from pypaimon.utils.file_type import FileType
+
+
+class FileTypeClassifyTest(unittest.TestCase):
+
+ def test_snapshot(self):
+ self.assertEqual(FileType.META,
FileType.classify("/warehouse/db/t/snapshot/snapshot-1"))
+ self.assertEqual(FileType.META, FileType.classify("snapshot-42"))
+
+ def test_schema(self):
+ self.assertEqual(FileType.META,
FileType.classify("/warehouse/db/t/schema/schema-0"))
+
+ def test_statistics(self):
+ self.assertEqual(FileType.META, FileType.classify("stat-abc123"))
+
+ def test_tag(self):
+ self.assertEqual(FileType.META, FileType.classify("tag-v1.0"))
+
+ def test_consumer(self):
+ self.assertEqual(FileType.META, FileType.classify("consumer-group1"))
+
+ def test_service(self):
+ self.assertEqual(FileType.META, FileType.classify("service-abc"))
+
+ def test_manifest(self):
+ self.assertEqual(FileType.META, FileType.classify("manifest-abc123"))
+ self.assertEqual(FileType.META, FileType.classify("manifest-list-abc"))
+ self.assertEqual(FileType.META,
FileType.classify("index-manifest-abc"))
+
+ def test_hint_files(self):
+ self.assertEqual(FileType.META, FileType.classify("EARLIEST"))
+ self.assertEqual(FileType.META, FileType.classify("LATEST"))
+
+ def test_success_files(self):
+ self.assertEqual(FileType.META, FileType.classify("_SUCCESS"))
+ self.assertEqual(FileType.META, FileType.classify("part-0_SUCCESS"))
+
+ def test_changelog_meta(self):
+ self.assertEqual(FileType.META,
FileType.classify("/warehouse/db/t/changelog/changelog-1"))
+
+ def test_changelog_not_in_changelog_dir(self):
+ self.assertEqual(FileType.DATA,
FileType.classify("/warehouse/db/t/other/changelog-1"))
+
+ def test_global_index(self):
+ self.assertEqual(FileType.GLOBAL_INDEX,
+ FileType.classify("global-index-abc-123.index"))
+ self.assertEqual(FileType.GLOBAL_INDEX,
+
FileType.classify("/warehouse/db/t/global-index/global-index-uuid.index"))
+
+ def test_file_index(self):
+ self.assertEqual(FileType.FILE_INDEX,
+ FileType.classify("data-abc.orc.index"))
+ self.assertEqual(FileType.FILE_INDEX,
+ FileType.classify("some-file.index"))
+
+ def test_bucket_index(self):
+ self.assertEqual(FileType.BUCKET_INDEX,
FileType.classify("index-abc-0"))
+ self.assertEqual(FileType.BUCKET_INDEX,
FileType.classify("index-uuid-1"))
+
+ def test_data_files(self):
+ self.assertEqual(FileType.DATA, FileType.classify("data-abc.orc"))
+ self.assertEqual(FileType.DATA, FileType.classify("data-abc.parquet"))
+ self.assertEqual(FileType.DATA, FileType.classify("unknown-file"))
+
+ def test_temp_file_unwrap(self):
+ # .{originalName}.{UUID}.tmp -> originalName
+ uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
+ temp_name = f".snapshot-1.{uuid}.tmp"
+ self.assertEqual(FileType.META, FileType.classify(temp_name))
+
+ temp_data = f".data-abc.orc.{uuid}.tmp"
+ self.assertEqual(FileType.DATA, FileType.classify(temp_data))
+
+ def test_temp_file_not_matching_pattern(self):
+ self.assertEqual(FileType.DATA, FileType.classify(".short.tmp"))
+ self.assertEqual(FileType.DATA,
FileType.classify("no-leading-dot.uuid-here.tmp"))
+
+ def test_is_index(self):
+ self.assertTrue(FileType.BUCKET_INDEX.is_index())
+ self.assertTrue(FileType.GLOBAL_INDEX.is_index())
+ self.assertTrue(FileType.FILE_INDEX.is_index())
+ self.assertFalse(FileType.META.is_index())
+ self.assertFalse(FileType.DATA.is_index())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/utils/file_type.py
b/paimon-python/pypaimon/utils/file_type.py
new file mode 100644
index 0000000000..384f927da7
--- /dev/null
+++ b/paimon-python/pypaimon/utils/file_type.py
@@ -0,0 +1,120 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Classification of Paimon files."""
+
+import logging
+import os
+from enum import Enum
+
+logger = logging.getLogger(__name__)
+
+
+class FileType(Enum):
+ """Classification of Paimon files.
+
+ - META: snapshot, schema, manifest, statistics, tag, changelog metadata,
+ hint files, _SUCCESS, consumer, service files
+ - DATA: data files and any unrecognized files (default)
+ - BUCKET_INDEX: bucket level index files (Hash, DV)
+ - GLOBAL_INDEX: table level global index files (btree, bitmap, lumina,
tantivy)
+ - FILE_INDEX: data-file index files (bloom filter, bitmap, etc.)
+ """
+ META = "META"
+ DATA = "DATA"
+ BUCKET_INDEX = "BUCKET_INDEX"
+ GLOBAL_INDEX = "GLOBAL_INDEX"
+ FILE_INDEX = "FILE_INDEX"
+
+ def is_index(self) -> bool:
+ return self in (FileType.BUCKET_INDEX, FileType.GLOBAL_INDEX,
FileType.FILE_INDEX)
+
+ @staticmethod
+ def is_mutable(file_path: str) -> bool:
+ name = os.path.basename(file_path)
+ return name in ("EARLIEST", "LATEST")
+
+ @staticmethod
+ def classify(file_path: str) -> 'FileType':
+ name = os.path.basename(file_path)
+ name = FileType._unwrap_temp_file_name(name)
+
+ if (name.startswith("snapshot-")
+ or name.startswith("schema-")
+ or name.startswith("stat-")
+ or name.startswith("tag-")
+ or name.startswith("consumer-")
+ or name.startswith("service-")):
+ return FileType.META
+
+ if name.endswith(".index"):
+ if "global-index-" in name:
+ return FileType.GLOBAL_INDEX
+ return FileType.FILE_INDEX
+
+ if "manifest" in name:
+ return FileType.META
+
+ if name.startswith("index-"):
+ return FileType.BUCKET_INDEX
+
+ if name in ("EARLIEST", "LATEST"):
+ return FileType.META
+
+ if name == "_SUCCESS" or name.endswith("_SUCCESS"):
+ return FileType.META
+
+ if name.startswith("changelog-"):
+ parent = os.path.basename(os.path.dirname(file_path))
+ if parent == "changelog":
+ return FileType.META
+
+ return FileType.DATA
+
+ @staticmethod
+ def parse_whitelist(whitelist_str: str) -> set:
+ mapping = {
+ "meta": FileType.META,
+ "global-index": FileType.GLOBAL_INDEX,
+ "bucket-index": FileType.BUCKET_INDEX,
+ "data": FileType.DATA,
+ "file-index": FileType.FILE_INDEX,
+ }
+ result = set()
+ for name in whitelist_str.split(","):
+ name = name.strip()
+ if name in mapping:
+ result.add(mapping[name])
+ elif name:
+ logger.warning(
+ "Unknown local-cache.whitelist value '%s'. "
+ "Supported values: meta, global-index, bucket-index, data,
file-index.",
+ name,
+ )
+ return result
+
+ @staticmethod
+ def _unwrap_temp_file_name(name: str) -> str:
+ # format: .{originalName}.{UUID}.tmp
+ # suffix ".{UUID}.tmp" is 41 chars: 1(dot) + 36(UUID) + 4(.tmp)
+ if len(name) < 43 or name[0] != '.' or not name.endswith('.tmp'):
+ return name
+ dot_before_uuid = len(name) - 41
+ if name[dot_before_uuid] != '.':
+ return name
+ return name[1:dot_before_uuid]