This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cb8c61  feat: add IO cache (LRUCache, CacheManager, CacheInputStream) 
and MemorySlice utilities (#40)
8cb8c61 is described below

commit 8cb8c61a66e14607628fe7a8144fa15baa09c603
Author: lxy <[email protected]>
AuthorDate: Tue Jun 2 16:34:44 2026 +0800

    feat: add IO cache (LRUCache, CacheManager, CacheInputStream) and 
MemorySlice utilities (#40)
---
 src/paimon/common/io/cache/cache.h               |  81 +++++
 src/paimon/common/io/cache/cache_key.cpp         |  58 +++
 src/paimon/common/io/cache/cache_key.h           |  85 +++++
 src/paimon/common/io/cache/cache_manager.cpp     |  45 +++
 src/paimon/common/io/cache/cache_manager.h       |  93 +++++
 src/paimon/common/io/cache/lru_cache.cpp         |  70 ++++
 src/paimon/common/io/cache/lru_cache.h           |  68 ++++
 src/paimon/common/io/cache/lru_cache_test.cpp    | 389 ++++++++++++++++++++
 src/paimon/common/io/cache_input_stream.h        |  92 +++++
 src/paimon/common/io/cache_input_stream_test.cpp | 234 +++++++++++++
 src/paimon/common/memory/memory_slice.cpp        |  89 +++++
 src/paimon/common/memory/memory_slice.h          |  78 +++++
 src/paimon/common/memory/memory_slice_input.h    | 128 +++++++
 src/paimon/common/memory/memory_slice_output.cpp | 118 +++++++
 src/paimon/common/memory/memory_slice_output.h   |  69 ++++
 src/paimon/common/memory/memory_slice_test.cpp   | 429 +++++++++++++++++++++++
 16 files changed, 2126 insertions(+)

diff --git a/src/paimon/common/io/cache/cache.h 
b/src/paimon/common/io/cache/cache.h
new file mode 100644
index 0000000..d2cfeab
--- /dev/null
+++ b/src/paimon/common/io/cache/cache.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class CacheValue;
+
+/// Callback invoked when a cache entry is evicted by the LRU policy.
+using CacheCallback = std::function<void(const std::shared_ptr<CacheKey>&)>;
+
+class PAIMON_EXPORT Cache {
+ public:
+    virtual ~Cache() = default;
+    virtual Result<std::shared_ptr<CacheValue>> Get(
+        const std::shared_ptr<CacheKey>& key,
+        std::function<Result<std::shared_ptr<CacheValue>>(const 
std::shared_ptr<CacheKey>&)>
+            supplier) = 0;
+
+    virtual Status Put(const std::shared_ptr<CacheKey>& key,
+                       const std::shared_ptr<CacheValue>& value) = 0;
+
+    virtual void Invalidate(const std::shared_ptr<CacheKey>& key) = 0;
+
+    virtual void InvalidateAll() = 0;
+
+    virtual size_t Size() const = 0;
+};
+
+class CacheValue {
+ public:
+    CacheValue(const MemorySegment& segment, CacheCallback callback)
+        : segment_(segment), callback_(std::move(callback)) {}
+
+    const MemorySegment& GetSegment() const {
+        return segment_;
+    }
+
+    /// Invoke the eviction callback, if one was registered.
+    void OnEvict(const std::shared_ptr<CacheKey>& key) const {
+        if (callback_) {
+            callback_(key);
+        }
+    }
+
+    bool operator==(const CacheValue& other) const {
+        if (this == &other) {
+            return true;
+        }
+        return segment_ == other.segment_;
+    }
+
+ private:
+    MemorySegment segment_;
+    CacheCallback callback_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_key.cpp 
b/src/paimon/common/io/cache/cache_key.cpp
new file mode 100644
index 0000000..383bb20
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_key.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/cache_key.h"
+
+namespace paimon {
+
+std::shared_ptr<CacheKey> CacheKey::ForPosition(const std::string& file_path, 
int64_t position,
+                                                int32_t length, bool is_index) 
{
+    return std::make_shared<PositionCacheKey>(file_path, position, length, 
is_index);
+}
+
+bool PositionCacheKey::IsIndex() const {
+    return is_index_;
+}
+
+int64_t PositionCacheKey::Position() const {
+    return position_;
+}
+
+int32_t PositionCacheKey::Length() const {
+    return length_;
+}
+
+bool PositionCacheKey::Equals(const CacheKey& other) const {
+    const auto* rhs = dynamic_cast<const PositionCacheKey*>(&other);
+    if (!rhs) {
+        return false;
+    }
+    return file_path_ == rhs->file_path_ && position_ == rhs->position_ &&
+           length_ == rhs->length_ && is_index_ == rhs->is_index_;
+}
+
+size_t PositionCacheKey::HashCode() const {
+    size_t seed = 0;
+    seed ^= std::hash<std::string>{}(file_path_) + HASH_CONSTANT + (seed << 6) 
+ (seed >> 2);
+    seed ^= std::hash<int64_t>{}(position_) + HASH_CONSTANT + (seed << 6) + 
(seed >> 2);
+    seed ^= std::hash<int32_t>{}(length_) + HASH_CONSTANT + (seed << 6) + 
(seed >> 2);
+    seed ^= std::hash<bool>{}(is_index_) + HASH_CONSTANT + (seed << 6) + (seed 
>> 2);
+    return seed;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_key.h 
b/src/paimon/common/io/cache/cache_key.h
new file mode 100644
index 0000000..75f8cb8
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_key.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class CacheValue;
+
+class PAIMON_EXPORT CacheKey {
+ public:
+    static std::shared_ptr<CacheKey> ForPosition(const std::string& file_path, 
int64_t position,
+                                                 int32_t length, bool 
is_index);
+
+ public:
+    virtual ~CacheKey() = default;
+
+    virtual bool IsIndex() const = 0;
+
+    virtual bool Equals(const CacheKey& other) const = 0;
+
+    virtual size_t HashCode() const = 0;
+};
+
+class PositionCacheKey : public CacheKey {
+ public:
+    PositionCacheKey(const std::string& file_path, int64_t position, int32_t 
length, bool is_index)
+        : file_path_(file_path), position_(position), length_(length), 
is_index_(is_index) {}
+
+    bool IsIndex() const override;
+    size_t HashCode() const override;
+    bool Equals(const CacheKey& other) const override;
+    int64_t Position() const;
+    int32_t Length() const;
+
+ private:
+    static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL;
+
+    const std::string file_path_;
+    const int64_t position_;
+    const int32_t length_;
+    const bool is_index_;
+};
+
+struct CacheKeyHash {
+    size_t operator()(const std::shared_ptr<CacheKey>& key) const {
+        return key ? key->HashCode() : 0;
+    }
+};
+
+struct CacheKeyEqual {
+    bool operator()(const std::shared_ptr<CacheKey>& lhs,
+                    const std::shared_ptr<CacheKey>& rhs) const {
+        if (lhs == rhs) {
+            return true;
+        }
+        if (!lhs || !rhs) {
+            return false;
+        }
+        return lhs->Equals(*rhs);
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_manager.cpp 
b/src/paimon/common/io/cache/cache_manager.cpp
new file mode 100644
index 0000000..3e5c049
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_manager.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/cache_manager.h"
+
+namespace paimon {
+
+Result<MemorySegment> CacheManager::GetPage(
+    std::shared_ptr<CacheKey>& key,
+    std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> 
reader,
+    CacheCallback eviction_callback) {
+    auto& cache = key->IsIndex() ? index_cache_ : data_cache_;
+    auto supplier =
+        [&](const std::shared_ptr<CacheKey>& key) -> 
Result<std::shared_ptr<CacheValue>> {
+        PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(key));
+        return std::make_shared<CacheValue>(segment, 
std::move(eviction_callback));
+    };
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> cache_value, 
cache->Get(key, supplier));
+    return cache_value->GetSegment();
+}
+
+void CacheManager::InvalidPage(const std::shared_ptr<CacheKey>& key) {
+    if (key->IsIndex()) {
+        index_cache_->Invalidate(key);
+    } else {
+        data_cache_->Invalidate(key);
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_manager.h 
b/src/paimon/common/io/cache/cache_manager.h
new file mode 100644
index 0000000..bad3f68
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_manager.h
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/common/io/cache/cache.h"
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/io/cache/lru_cache.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class PAIMON_EXPORT CacheManager {
+ public:
+    /// Refreshing the cache comes with some costs, so not every time we visit 
the CacheManager, but
+    /// every 10 visits, refresh the LRU strategy.
+    static constexpr int32_t REFRESH_COUNT = 10;
+
+    /// Container that wraps a MemorySegment with an access counter for 
refresh.
+    class SegmentContainer {
+     public:
+        explicit SegmentContainer(const MemorySegment& segment) : 
segment_(segment) {}
+
+        const MemorySegment& Access() {
+            access_count_++;
+            return segment_;
+        }
+
+        int32_t GetAccessCount() const {
+            return access_count_;
+        }
+
+     private:
+        MemorySegment segment_;
+        int32_t access_count_ = 0;
+    };
+
+    /// Constructs a CacheManager with LRU caching.
+    /// @param max_memory_bytes Total cache capacity in bytes.
+    /// @param high_priority_pool_ratio Ratio of capacity reserved for index 
cache [0.0, 1.0).
+    ///        If 0, index and data share the same cache.
+    CacheManager(int64_t max_memory_bytes, double high_priority_pool_ratio) {
+        auto index_cache_bytes = static_cast<int64_t>(max_memory_bytes * 
high_priority_pool_ratio);
+        auto data_cache_bytes =
+            static_cast<int64_t>(max_memory_bytes * (1.0 - 
high_priority_pool_ratio));
+        data_cache_ = std::make_shared<LruCache>(data_cache_bytes);
+        if (high_priority_pool_ratio == 0.0) {
+            index_cache_ = data_cache_;
+        } else {
+            index_cache_ = std::make_shared<LruCache>(index_cache_bytes);
+        }
+    }
+
+    Result<MemorySegment> GetPage(
+        std::shared_ptr<CacheKey>& key,
+        std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)> 
reader,
+        CacheCallback eviction_callback);
+
+    void InvalidPage(const std::shared_ptr<CacheKey>& key);
+
+    const std::shared_ptr<Cache>& DataCache() const {
+        return data_cache_;
+    }
+
+    const std::shared_ptr<Cache>& IndexCache() const {
+        return index_cache_;
+    }
+
+ private:
+    std::shared_ptr<Cache> data_cache_;
+    std::shared_ptr<Cache> index_cache_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/lru_cache.cpp 
b/src/paimon/common/io/cache/lru_cache.cpp
new file mode 100644
index 0000000..fbfca83
--- /dev/null
+++ b/src/paimon/common/io/cache/lru_cache.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/lru_cache.h"
+
+namespace paimon {
+
+LruCache::LruCache(int64_t max_weight)
+    : inner_cache_(InnerCache::Options{
+          .max_weight = max_weight,
+          .expire_after_access_ms = -1,
+          .weigh_func = [](const std::shared_ptr<CacheKey>& /*key*/,
+                           const std::shared_ptr<CacheValue>& value) -> 
int64_t {
+              return value ? value->GetSegment().Size() : 0;
+          },
+          .removal_callback =
+              [](const std::shared_ptr<CacheKey>& key, const 
std::shared_ptr<CacheValue>& value,
+                 auto cause) {
+                  if (value) {
+                      value->OnEvict(key);
+                  }
+              }}) {}
+
+Result<std::shared_ptr<CacheValue>> LruCache::Get(
+    const std::shared_ptr<CacheKey>& key,
+    std::function<Result<std::shared_ptr<CacheValue>>(const 
std::shared_ptr<CacheKey>&)> supplier) {
+    return inner_cache_.Get(key, std::move(supplier));
+}
+
+Status LruCache::Put(const std::shared_ptr<CacheKey>& key,
+                     const std::shared_ptr<CacheValue>& value) {
+    return inner_cache_.Put(key, value);
+}
+
+void LruCache::Invalidate(const std::shared_ptr<CacheKey>& key) {
+    inner_cache_.Invalidate(key);
+}
+
+void LruCache::InvalidateAll() {
+    inner_cache_.InvalidateAll();
+}
+
+size_t LruCache::Size() const {
+    return inner_cache_.Size();
+}
+
+int64_t LruCache::GetCurrentWeight() const {
+    return inner_cache_.GetCurrentWeight();
+}
+
+int64_t LruCache::GetMaxWeight() const {
+    return inner_cache_.GetMaxWeight();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/lru_cache.h 
b/src/paimon/common/io/cache/lru_cache.h
new file mode 100644
index 0000000..07d6038
--- /dev/null
+++ b/src/paimon/common/io/cache/lru_cache.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/common/io/cache/cache.h"
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/utils/generic_lru_cache.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// LRU Cache implementation with weight-based eviction for block cache.
+///
+/// Wraps GenericLruCache with CacheKey/CacheValue types. Capacity is measured
+/// in bytes (sum of MemorySegment sizes). When an entry is evicted, its
+/// CacheCallback is invoked to notify the upper layer.
+///
+/// @note Thread-safe: all public methods are protected by the underlying 
GenericLruCache lock.
+class PAIMON_EXPORT LruCache : public Cache {
+ public:
+    explicit LruCache(int64_t max_weight);
+
+    Result<std::shared_ptr<CacheValue>> Get(
+        const std::shared_ptr<CacheKey>& key,
+        std::function<Result<std::shared_ptr<CacheValue>>(const 
std::shared_ptr<CacheKey>&)>
+            supplier) override;
+
+    Status Put(const std::shared_ptr<CacheKey>& key,
+               const std::shared_ptr<CacheValue>& value) override;
+
+    void Invalidate(const std::shared_ptr<CacheKey>& key) override;
+
+    void InvalidateAll() override;
+
+    size_t Size() const override;
+
+    int64_t GetCurrentWeight() const;
+
+    int64_t GetMaxWeight() const;
+
+ private:
+    using InnerCache = GenericLruCache<std::shared_ptr<CacheKey>, 
std::shared_ptr<CacheValue>,
+                                       CacheKeyHash, CacheKeyEqual>;
+
+    InnerCache inner_cache_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache/lru_cache_test.cpp 
b/src/paimon/common/io/cache/lru_cache_test.cpp
new file mode 100644
index 0000000..9e8ae44
--- /dev/null
+++ b/src/paimon/common/io/cache/lru_cache_test.cpp
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/lru_cache.h"
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/cache/cache.h"
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class LruCacheTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+
+    std::shared_ptr<CacheKey> MakeKey(int64_t position, bool is_index = false) 
const {
+        return CacheKey::ForPosition("test_file", position, 64, is_index);
+    }
+
+    std::shared_ptr<CacheValue> MakeValue(int32_t size, char fill_byte = 0,
+                                          CacheCallback callback = {}) const {
+        auto segment = MemorySegment::AllocateHeapMemory(size, pool_.get());
+        std::memset(segment.MutableData(), fill_byte, size);
+        return std::make_shared<CacheValue>(segment, std::move(callback));
+    }
+
+    auto MakeSupplier(int32_t size, char fill_byte = 0, CacheCallback callback 
= {}) const {
+        return [this, size, fill_byte, callback = std::move(callback)](
+                   const std::shared_ptr<CacheKey>&) -> 
Result<std::shared_ptr<CacheValue>> {
+            return MakeValue(size, fill_byte, callback);
+        };
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+/// Verifies basic Get with supplier: cache miss loads via supplier, cache hit 
returns existing.
+TEST_F(LruCacheTest, TestGetCacheHitAndMiss) {
+    LruCache cache(1024);
+
+    auto key = MakeKey(0);
+    int32_t supplier_call_count = 0;
+    auto supplier = [&](const std::shared_ptr<CacheKey>&) -> 
Result<std::shared_ptr<CacheValue>> {
+        supplier_call_count++;
+        return MakeValue(64, 'A');
+    };
+
+    // First Get: cache miss, supplier should be called
+    ASSERT_OK_AND_ASSIGN(auto value1, cache.Get(key, supplier));
+    ASSERT_EQ(supplier_call_count, 1);
+    ASSERT_EQ(value1->GetSegment().Size(), 64);
+    ASSERT_EQ(value1->GetSegment().Get(0), 'A');
+    ASSERT_EQ(cache.Size(), 1);
+
+    // Second Get with same key: cache hit, supplier should NOT be called
+    ASSERT_OK_AND_ASSIGN(auto value2, cache.Get(key, supplier));
+    ASSERT_EQ(supplier_call_count, 1);
+    ASSERT_EQ(value2->GetSegment().Get(0), 'A');
+    ASSERT_EQ(cache.Size(), 1);
+    ASSERT_EQ(cache.GetMaxWeight(), 1024);
+}
+
+/// Verifies Put inserts new entries and updates existing ones.
+TEST_F(LruCacheTest, TestPutInsertAndUpdate) {
+    LruCache cache(1024);
+
+    auto key = MakeKey(0);
+    auto value_a = MakeValue(64, 'A');
+    auto value_b = MakeValue(128, 'B');
+
+    // Insert
+    ASSERT_OK(cache.Put(key, value_a));
+    ASSERT_EQ(cache.Size(), 1);
+    ASSERT_EQ(cache.GetCurrentWeight(), 64);
+
+    // Update with larger value
+    ASSERT_OK(cache.Put(key, value_b));
+    ASSERT_EQ(cache.Size(), 1);
+    ASSERT_EQ(cache.GetCurrentWeight(), 128);
+
+    // Verify the updated value is returned
+    ASSERT_OK_AND_ASSIGN(auto result, cache.Get(key, MakeSupplier(0)));
+    ASSERT_EQ(result->GetSegment().Size(), 128);
+    ASSERT_EQ(result->GetSegment().Get(0), 'B');
+}
+
+/// Verifies weight-based eviction: when total weight exceeds max, LRU entries 
are evicted.
+TEST_F(LruCacheTest, TestWeightBasedEviction) {
+    // Cache can hold at most 200 bytes
+    LruCache cache(200);
+
+    auto key0 = MakeKey(0);
+    auto key1 = MakeKey(1);
+    auto key2 = MakeKey(2);
+
+    // Insert 2 entries of 100 bytes each (total 200, at capacity)
+    ASSERT_OK(cache.Put(key0, MakeValue(100, 'A')));
+    ASSERT_OK(cache.Put(key1, MakeValue(100, 'B')));
+    ASSERT_EQ(cache.Size(), 2);
+    ASSERT_EQ(cache.GetCurrentWeight(), 200);
+
+    // Insert a 3rd entry: should evict key0 (LRU, inserted first)
+    ASSERT_OK(cache.Put(key2, MakeValue(100, 'C')));
+    ASSERT_EQ(cache.Size(), 2);
+    ASSERT_EQ(cache.GetCurrentWeight(), 200);
+
+    // key0 should be evicted, Get should call supplier
+    int32_t supplier_called = 0;
+    auto supplier = [&](const std::shared_ptr<CacheKey>&) -> 
Result<std::shared_ptr<CacheValue>> {
+        supplier_called++;
+        return MakeValue(100, 'X');
+    };
+    ASSERT_OK_AND_ASSIGN(auto result, cache.Get(key0, supplier));
+    ASSERT_EQ(supplier_called, 1);
+    ASSERT_EQ(result->GetSegment().Get(0), 'X');
+}
+
+/// Verifies that eviction invokes the CacheCallback on evicted entries.
+TEST_F(LruCacheTest, TestEvictionCallback) {
+    LruCache cache(200);
+
+    std::vector<int64_t> evicted_positions;
+    auto make_callback = [&evicted_positions](int64_t position) -> 
CacheCallback {
+        return [&evicted_positions, position](const 
std::shared_ptr<CacheKey>&) {
+            evicted_positions.push_back(position);
+        };
+    };
+
+    auto key0 = MakeKey(0);
+    auto key1 = MakeKey(1);
+    auto key2 = MakeKey(2);
+
+    ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', make_callback(0))));
+    ASSERT_OK(cache.Put(key1, MakeValue(100, 'B', make_callback(1))));
+    ASSERT_TRUE(evicted_positions.empty());
+
+    // Insert key2: should evict key0 and trigger its callback
+    ASSERT_OK(cache.Put(key2, MakeValue(100, 'C', make_callback(2))));
+    ASSERT_EQ(evicted_positions.size(), 1);
+    ASSERT_EQ(evicted_positions[0], 0);
+}
+
+/// Verifies LRU ordering: accessing an entry moves it to the front, 
preventing eviction.
+TEST_F(LruCacheTest, TestLruOrdering) {
+    LruCache cache(200);
+
+    auto key0 = MakeKey(0);
+    auto key1 = MakeKey(1);
+    auto key2 = MakeKey(2);
+
+    ASSERT_OK(cache.Put(key0, MakeValue(100, 'A')));
+    ASSERT_OK(cache.Put(key1, MakeValue(100, 'B')));
+
+    // Access key0 via Get to move it to front (most recently used)
+    ASSERT_OK_AND_ASSIGN(auto val, cache.Get(key0, MakeSupplier(0)));
+    ASSERT_EQ(val->GetSegment().Get(0), 'A');
+
+    // Insert key2: should evict key1 (now LRU), NOT key0
+    std::vector<int64_t> evicted;
+    auto callback0 = [&evicted](const std::shared_ptr<CacheKey>&) { 
evicted.push_back(0); };
+    auto callback1 = [&evicted](const std::shared_ptr<CacheKey>&) { 
evicted.push_back(1); };
+
+    // Re-insert with callbacks to track eviction
+    cache.InvalidateAll();
+    ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', callback0)));
+    ASSERT_OK(cache.Put(key1, MakeValue(100, 'B', callback1)));
+
+    // Access key0 to move it to front
+    ASSERT_OK_AND_ASSIGN(val, cache.Get(key0, MakeSupplier(0)));
+
+    // Insert key2: key1 should be evicted (it's at the back)
+    ASSERT_OK(cache.Put(key2, MakeValue(100, 'C')));
+    ASSERT_EQ(evicted.size(), 1);
+    ASSERT_EQ(evicted[0], 1);
+    ASSERT_EQ(cache.Size(), 2);
+}
+
+/// Verifies Invalidate removes a specific entry and adjusts weight.
+TEST_F(LruCacheTest, TestInvalidate) {
+    LruCache cache(1024);
+
+    auto key0 = MakeKey(0);
+    auto key1 = MakeKey(1);
+
+    std::vector<int64_t> evicted;
+    auto callback0 = [&evicted](const std::shared_ptr<CacheKey>&) { 
evicted.push_back(0); };
+    auto callback1 = [&evicted](const std::shared_ptr<CacheKey>&) { 
evicted.push_back(1); };
+    ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', callback0)));
+    ASSERT_OK(cache.Put(key1, MakeValue(200, 'B', callback1)));
+    ASSERT_EQ(cache.Size(), 2);
+    ASSERT_EQ(cache.GetCurrentWeight(), 300);
+    ASSERT_TRUE(evicted.empty());
+
+    // Invalidate key0
+    cache.Invalidate(key0);
+    ASSERT_EQ(cache.Size(), 1);
+    ASSERT_EQ(cache.GetCurrentWeight(), 200);
+    ASSERT_EQ(evicted, std::vector<int64_t>({0}));
+
+    // Invalidating a non-existent key is a no-op
+    cache.Invalidate(MakeKey(999));
+    ASSERT_EQ(cache.Size(), 1);
+    ASSERT_EQ(cache.GetCurrentWeight(), 200);
+    ASSERT_EQ(evicted, std::vector<int64_t>({0}));
+}
+
+/// Verifies InvalidateAll clears all entries and resets weight.
+TEST_F(LruCacheTest, TestInvalidateAll) {
+    LruCache cache(1024);
+
+    std::vector<int64_t> evicted;
+    for (int32_t i = 0; i < 5; i++) {
+        auto callback = [&evicted, id = i](const std::shared_ptr<CacheKey>&) {
+            evicted.push_back(id);
+        };
+        ASSERT_OK(cache.Put(MakeKey(i), MakeValue(50, 'A', callback)));
+    }
+    ASSERT_EQ(cache.Size(), 5);
+    ASSERT_EQ(cache.GetCurrentWeight(), 250);
+    ASSERT_TRUE(evicted.empty());
+
+    cache.InvalidateAll();
+    ASSERT_EQ(cache.Size(), 0);
+    ASSERT_EQ(cache.GetCurrentWeight(), 0);
+    ASSERT_EQ(evicted, std::vector<int64_t>({0, 1, 2, 3, 4}));
+}
+
+/// Verifies weight tracking is accurate across Put, Update, Invalidate, and 
Eviction.
+TEST_F(LruCacheTest, TestWeightTracking) {
+    LruCache cache(500);
+
+    auto key0 = MakeKey(0);
+    auto key1 = MakeKey(1);
+
+    // Put 100 bytes
+    ASSERT_OK(cache.Put(key0, MakeValue(100)));
+    ASSERT_EQ(cache.GetCurrentWeight(), 100);
+
+    // Put 200 bytes
+    ASSERT_OK(cache.Put(key1, MakeValue(200)));
+    ASSERT_EQ(cache.GetCurrentWeight(), 300);
+
+    // Update key0 from 100 to 150 bytes
+    ASSERT_OK(cache.Put(key0, MakeValue(150)));
+    ASSERT_EQ(cache.GetCurrentWeight(), 350);
+
+    // Invalidate key1 (200 bytes)
+    cache.Invalidate(key1);
+    ASSERT_EQ(cache.GetCurrentWeight(), 150);
+
+    // Put 200 bytes
+    ASSERT_OK(cache.Put(MakeKey(2), MakeValue(200)));
+    ASSERT_EQ(cache.GetCurrentWeight(), 350);
+
+    // Add: total would be 550 > 500, should evict key0 (150 bytes)
+    ASSERT_OK(cache.Put(MakeKey(3), MakeValue(200)));
+    ASSERT_EQ(cache.GetCurrentWeight(), 400);
+    ASSERT_EQ(cache.Size(), 2);
+}
+
+/// Verifies that Get via supplier correctly handles the case where the 
supplier returns an error.
+TEST_F(LruCacheTest, TestSupplierError) {
+    LruCache cache(1024);
+
+    auto key = MakeKey(0);
+    auto error_supplier =
+        [](const std::shared_ptr<CacheKey>&) -> 
Result<std::shared_ptr<CacheValue>> {
+        return Status::IOError("simulated read failure");
+    };
+
+    // Get should propagate the supplier error
+    ASSERT_NOK_WITH_MSG(cache.Get(key, error_supplier), "simulated read 
failure");
+
+    // Cache should remain empty after failed supplier
+    ASSERT_EQ(cache.Size(), 0);
+    ASSERT_EQ(cache.GetCurrentWeight(), 0);
+}
+
+/// Verifies thread safety: concurrent Get/Put operations should not corrupt 
internal state.
+TEST_F(LruCacheTest, TestConcurrentAccess) {
+    LruCache cache(10000);
+    constexpr int32_t num_threads = 8;
+    constexpr int32_t ops_per_thread = 100;
+
+    std::atomic<int32_t> supplier_calls{0};
+    auto supplier = [&](const std::shared_ptr<CacheKey>&) -> 
Result<std::shared_ptr<CacheValue>> {
+        supplier_calls++;
+        return MakeValue(10, 'X');
+    };
+
+    std::vector<std::thread> threads;
+    for (int32_t t = 0; t < num_threads; t++) {
+        threads.emplace_back([&, t]() {
+            for (int32_t i = 0; i < ops_per_thread; i++) {
+                auto key = MakeKey(t * ops_per_thread + i);
+                auto result = cache.Get(key, supplier);
+                ASSERT_TRUE(result.ok());
+            }
+        });
+    }
+
+    for (auto& thread : threads) {
+        thread.join();
+    }
+
+    // All entries should be in cache (capacity is large enough)
+    ASSERT_EQ(cache.Size(), num_threads * ops_per_thread);
+    ASSERT_EQ(supplier_calls.load(), num_threads * ops_per_thread);
+}
+
+/// Verifies that Put moves an existing entry to the front of the LRU list.
+TEST_F(LruCacheTest, TestPutMovesToFront) {
+    LruCache cache(200);
+
+    auto key0 = MakeKey(0);
+    auto key1 = MakeKey(1);
+    auto key2 = MakeKey(2);
+
+    std::vector<int64_t> evicted;
+    auto make_callback = [&evicted](int64_t pos) -> CacheCallback {
+        return [&evicted, pos](const std::shared_ptr<CacheKey>&) { 
evicted.push_back(pos); };
+    };
+
+    ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', make_callback(0))));
+    ASSERT_OK(cache.Put(key1, MakeValue(100, 'B', make_callback(1))));
+
+    // Get key0 (should move it to front)
+    ASSERT_OK(cache.Get(key0, /*supplier=*/nullptr));
+
+    // Insert key2: should evict key1 (now at back), not key0
+    ASSERT_OK(cache.Put(key2, MakeValue(100, 'C', make_callback(2))));
+    ASSERT_EQ(evicted.size(), 1);
+    ASSERT_EQ(evicted[0], 1);
+}
+
+/// Verifies that multiple evictions happen when a single large entry is 
inserted.
+TEST_F(LruCacheTest, TestMultipleEvictions) {
+    LruCache cache(300);
+
+    std::vector<int64_t> evicted;
+    auto make_callback = [&evicted](int64_t pos) -> CacheCallback {
+        return [&evicted, pos](const std::shared_ptr<CacheKey>&) { 
evicted.push_back(pos); };
+    };
+
+    // Insert 3 entries of 100 bytes each
+    ASSERT_OK(cache.Put(MakeKey(0), MakeValue(100, 'A', make_callback(0))));
+    ASSERT_OK(cache.Put(MakeKey(1), MakeValue(100, 'B', make_callback(1))));
+    ASSERT_OK(cache.Put(MakeKey(2), MakeValue(100, 'C', make_callback(2))));
+    ASSERT_EQ(cache.Size(), 3);
+    ASSERT_EQ(cache.GetCurrentWeight(), 300);
+
+    // Insert a 250-byte entry: should evict key0, key1 and key2.
+    ASSERT_OK(cache.Put(MakeKey(3), MakeValue(250, 'D')));
+    ASSERT_EQ(cache.Size(), 1);
+    ASSERT_EQ(cache.GetCurrentWeight(), 250);
+
+    ASSERT_EQ(evicted, std::vector<int64_t>({0, 1, 2}));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/io/cache_input_stream.h 
b/src/paimon/common/io/cache_input_stream.h
new file mode 100644
index 0000000..3c48df3
--- /dev/null
+++ b/src/paimon/common/io/cache_input_stream.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstring>
+#include <memory>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon {
+
+class CacheInputStream : public InputStream {
+ public:
+    CacheInputStream(std::unique_ptr<InputStream> input_stream,
+                     const std::shared_ptr<ReadAheadCache>& cache)
+        : cache_(cache), input_stream_(std::move(input_stream)) {}
+
+    Status Seek(int64_t offset, SeekOrigin origin) override {
+        return input_stream_->Seek(offset, origin);
+    }
+    Result<int64_t> GetPos() const override {
+        return input_stream_->GetPos();
+    }
+    Result<int32_t> Read(char* buffer, uint32_t size) override {
+        return input_stream_->Read(buffer, size);
+    }
+    Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset) 
override {
+        if (cache_) {
+            ByteRange range{offset, static_cast<uint64_t>(size)};
+            PAIMON_ASSIGN_OR_RAISE(ByteSlice slice, cache_->Read(range));
+            if (slice.buffer) {
+                std::memcpy(buffer, slice.buffer->data() + slice.offset, 
slice.length);
+                return slice.length;
+            }
+        }
+        return input_stream_->Read(buffer, size, offset);
+    }
+    void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+                   std::function<void(Status)>&& callback) override {
+        if (cache_) {
+            ByteRange range{offset, static_cast<uint64_t>(size)};
+            Result<ByteSlice> slice = cache_->Read(range);
+            if (!slice.ok()) {
+                callback(slice.status());
+                return;
+            }
+            if (slice.value().buffer) {
+                std::memcpy(buffer, slice.value().buffer->data() + 
slice.value().offset,
+                            slice.value().length);
+                callback(Status::OK());
+                return;
+            }
+        }
+        return input_stream_->ReadAsync(buffer, size, offset, 
std::move(callback));
+    }
+
+    Status Close() override {
+        return input_stream_->Close();
+    }
+
+    Result<std::string> GetUri() const override {
+        return input_stream_->GetUri();
+    }
+
+    Result<uint64_t> Length() const override {
+        return input_stream_->Length();
+    }
+
+ private:
+    std::shared_ptr<ReadAheadCache> cache_;
+    std::unique_ptr<InputStream> input_stream_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/io/cache_input_stream_test.cpp 
b/src/paimon/common/io/cache_input_stream_test.cpp
new file mode 100644
index 0000000..2c2e36c
--- /dev/null
+++ b/src/paimon/common/io/cache_input_stream_test.cpp
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache_input_stream.h"
+
+#include <fstream>
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/io_exception_helper.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon::test {
+
+class CacheInputStreamTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+        test_dir_ = UniqueTestDirectory::Create();
+        ASSERT_TRUE(test_dir_);
+        content_ = "abcdefghijklmnopqrstuvwxyz0123456789";
+        file_path_ = test_dir_->Str() + "/test_data";
+        std::ofstream file(file_path_, std::ios::binary);
+        ASSERT_TRUE(file.is_open());
+        file.write(content_.data(), content_.size());
+        file.close();
+    }
+
+    std::unique_ptr<InputStream> OpenFile() const {
+        EXPECT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+                             FileSystemFactory::Get("local", file_path_, {}));
+        EXPECT_OK_AND_ASSIGN(std::unique_ptr<InputStream> in, 
fs->Open(file_path_));
+        return in;
+    }
+
+    std::shared_ptr<ReadAheadCache> CreateCache(std::vector<ByteRange> ranges) 
{
+        auto stream = OpenFile();
+        CacheConfig config(/*buffer_size_limit=*/1024 * 1024, 
/*range_size_limit=*/1024,
+                           /*hole_size_limit=*/0, /*pre_buffer_limit=*/1024 * 
1024);
+        auto cache = std::make_shared<ReadAheadCache>(std::move(stream), 
config, pool_);
+        EXPECT_OK(cache->Init(std::move(ranges)));
+        return cache;
+    }
+
+ protected:
+    std::shared_ptr<MemoryPool> pool_;
+    std::unique_ptr<UniqueTestDirectory> test_dir_;
+    std::string content_;
+    std::string file_path_;
+};
+
+// Test proxy methods: Seek, GetPos, Read (sequential), Close, GetUri, Length
+TEST_F(CacheInputStreamTest, TestProxyMethods) {
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), /*cache=*/nullptr);
+
+    // Length
+    ASSERT_OK_AND_ASSIGN(uint64_t length, stream.Length());
+    ASSERT_EQ(length, content_.size());
+
+    // GetUri
+    ASSERT_OK_AND_ASSIGN(std::string uri, stream.GetUri());
+    ASSERT_FALSE(uri.empty());
+
+    // Seek + GetPos
+    ASSERT_OK(stream.Seek(5, SeekOrigin::FS_SEEK_SET));
+    ASSERT_OK_AND_ASSIGN(int64_t pos, stream.GetPos());
+    ASSERT_EQ(pos, 5);
+
+    // Read (sequential, no offset)
+    std::string buffer(3, '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 3));
+    ASSERT_EQ(bytes_read, 3);
+    ASSERT_EQ(buffer, "fgh");
+
+    // Close
+    ASSERT_OK(stream.Close());
+}
+
+// Test Read(offset) with cache == nullptr → direct fallback to input_stream
+TEST_F(CacheInputStreamTest, TestReadWithOffsetNullCache) {
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), /*cache=*/nullptr);
+
+    std::string buffer(5, '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 5, 
/*offset=*/2));
+    ASSERT_EQ(bytes_read, 5);
+    ASSERT_EQ(buffer, "cdefg");
+}
+
+// Test Read(offset) with cache hit → memcpy from cache
+TEST_F(CacheInputStreamTest, TestReadWithOffsetCacheHit) {
+    // Cache range [2, 5) i.e. offset=2, length=5
+    auto cache = CreateCache({{2, 5}});
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), cache);
+
+    std::string buffer(5, '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 5, 
/*offset=*/2));
+    ASSERT_EQ(bytes_read, 5);
+    ASSERT_EQ(buffer, "cdefg");
+}
+
+// Test Read(offset) with cache miss → fallback to input_stream
+TEST_F(CacheInputStreamTest, TestReadWithOffsetCacheMiss) {
+    // Cache range [2, 5) but read from offset 10 which is not cached
+    auto cache = CreateCache({{2, 5}});
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), cache);
+
+    std::string buffer(3, '\0');
+    ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 3, 
/*offset=*/10));
+    ASSERT_EQ(bytes_read, 3);
+    ASSERT_EQ(buffer, "klm");
+}
+
+// Test ReadAsync with cache == nullptr → direct fallback to 
input_stream->ReadAsync
+TEST_F(CacheInputStreamTest, TestReadAsyncNullCache) {
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), /*cache=*/nullptr);
+
+    std::string buffer(4, '\0');
+    bool callback_called = false;
+    Status callback_status = Status::Invalid("not called");
+    stream.ReadAsync(buffer.data(), 4, /*offset=*/0, [&](Status status) {
+        callback_called = true;
+        callback_status = status;
+    });
+    ASSERT_TRUE(callback_called);
+    ASSERT_OK(callback_status);
+    ASSERT_EQ(buffer, "abcd");
+}
+
+// Test ReadAsync with cache hit → memcpy + callback(OK)
+TEST_F(CacheInputStreamTest, TestReadAsyncCacheHit) {
+    auto cache = CreateCache({{0, 10}});
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), cache);
+
+    std::string buffer(4, '\0');
+    bool callback_called = false;
+    Status callback_status = Status::Invalid("not called");
+    stream.ReadAsync(buffer.data(), 4, /*offset=*/3, [&](Status status) {
+        callback_called = true;
+        callback_status = status;
+    });
+    ASSERT_TRUE(callback_called);
+    ASSERT_OK(callback_status);
+    ASSERT_EQ(buffer, "defg");
+}
+
+// Test ReadAsync with cache miss → fallback to input_stream->ReadAsync
+TEST_F(CacheInputStreamTest, TestReadAsyncCacheMiss) {
+    // Cache range [0, 5) but read from offset 20 which is not cached
+    auto cache = CreateCache({{0, 5}});
+    auto underlying = OpenFile();
+    CacheInputStream stream(std::move(underlying), cache);
+
+    std::string buffer(4, '\0');
+    bool callback_called = false;
+    Status callback_status = Status::Invalid("not called");
+    stream.ReadAsync(buffer.data(), 4, /*offset=*/20, [&](Status status) {
+        callback_called = true;
+        callback_status = status;
+    });
+    ASSERT_TRUE(callback_called);
+    ASSERT_OK(callback_status);
+    ASSERT_EQ(buffer, "uvwx");
+}
+
+// Test ReadAsync when cache_->Read() returns error status.
+// Uses IOHook to inject IO error during ReadAheadCache's prefetch, causing 
cache_->Read() to fail.
+TEST_F(CacheInputStreamTest, TestReadAsyncCacheReadError) {
+    auto io_hook = paimon::IOHook::GetInstance();
+    bool error_triggered = false;
+
+    for (size_t i = 0; i < 20; i++) {
+        // Open the cache stream and underlying stream BEFORE activating IOHook
+        ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", 
file_path_, {}));
+        ASSERT_OK_AND_ASSIGN(auto cache_stream, fs->Open(file_path_));
+        ASSERT_OK_AND_ASSIGN(auto underlying, fs->Open(file_path_));
+        CacheConfig config(/*buffer_size_limit=*/1024 * 1024, 
/*range_size_limit=*/1024,
+                           /*hole_size_limit=*/0, /*pre_buffer_limit=*/1024 * 
1024);
+        auto cache = std::make_shared<ReadAheadCache>(std::move(cache_stream), 
config, pool_);
+        ASSERT_OK(cache->Init(std::vector<ByteRange>{{0, 10}}));
+
+        // Now activate IOHook so that the prefetch IO (triggered by 
cache_->Read -> PreBuffer)
+        // will fail at the i-th IO operation
+        paimon::ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
+        io_hook->Reset(i, paimon::IOHook::Mode::RETURN_ERROR);
+        CacheInputStream stream(std::move(underlying), cache);
+
+        std::string buffer(5, '\0');
+        bool callback_called = false;
+        Status callback_status = Status::OK();
+        stream.ReadAsync(buffer.data(), 5, /*offset=*/0, [&](Status status) {
+            callback_called = true;
+            callback_status = status;
+        });
+        ASSERT_TRUE(callback_called);
+        CHECK_HOOK_STATUS(callback_status, i);
+        ASSERT_EQ(buffer, "abcde");
+        error_triggered = true;
+        break;
+    }
+    ASSERT_TRUE(error_triggered);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/memory/memory_slice.cpp 
b/src/paimon/common/memory/memory_slice.cpp
new file mode 100644
index 0000000..903652b
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice.cpp
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/memory/memory_slice.h"
+
+#include "paimon/common/memory/memory_slice_input.h"
+
+namespace paimon {
+MemorySlice MemorySlice::Wrap(const std::shared_ptr<Bytes>& bytes) {
+    auto segment = MemorySegment::Wrap(bytes);
+    return MemorySlice(segment, 0, segment.Size());
+}
+
+MemorySlice MemorySlice::Wrap(const MemorySegment& segment) {
+    return MemorySlice(segment, 0, segment.Size());
+}
+
+MemorySlice::MemorySlice(const MemorySegment& segment, int32_t offset, int32_t 
length)
+    : segment_(segment), offset_(offset), length_(length) {}
+
+MemorySlice MemorySlice::Slice(int32_t index, int32_t length) const {
+    if (index == 0 && length == length_) {
+        return *this;
+    }
+    return MemorySlice(segment_, offset_ + index, length);
+}
+
+int32_t MemorySlice::Length() const {
+    return length_;
+}
+
+int32_t MemorySlice::Offset() const {
+    return offset_;
+}
+
+std::shared_ptr<Bytes> MemorySlice::GetOrCreateHeapMemory(MemoryPool* pool) 
const {
+    return segment_.GetOrCreateHeapMemory(pool);
+}
+
+const MemorySegment& MemorySlice::GetSegment() const {
+    return segment_;
+}
+
+int8_t MemorySlice::ReadByte(int32_t position) const {
+    return segment_.GetValue<int8_t>(offset_ + position);
+}
+
+int32_t MemorySlice::ReadInt(int32_t position) const {
+    return segment_.GetValue<int32_t>(offset_ + position);
+}
+
+int16_t MemorySlice::ReadShort(int32_t position) const {
+    return segment_.GetValue<int16_t>(offset_ + position);
+}
+
+int64_t MemorySlice::ReadLong(int32_t position) const {
+    return segment_.GetValue<int64_t>(offset_ + position);
+}
+
+std::string_view MemorySlice::ReadStringView() const {
+    return {Data(), static_cast<size_t>(length_)};
+}
+
+std::shared_ptr<Bytes> MemorySlice::CopyBytes(MemoryPool* pool) const {
+    auto bytes = std::make_shared<Bytes>(length_, pool);
+    std::memcpy(const_cast<char*>(bytes->data()), Data(), length_);
+    return bytes;
+}
+
+MemorySliceInput MemorySlice::ToInput() const {
+    return MemorySliceInput(*this);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice.h 
b/src/paimon/common/memory/memory_slice.h
new file mode 100644
index 0000000..2f45683
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+namespace paimon {
+class MemoryPool;
+class MemorySliceInput;
+
+/// Slice of a MemorySegment.
+///
+/// Represents a sub-range [offset_, offset_ + length_) of a MemorySegment.
+/// The MemorySegment may be owning (with shared_ptr<Bytes>) or non-owning 
(view).
+class PAIMON_EXPORT MemorySlice {
+ public:
+    static MemorySlice Wrap(const std::shared_ptr<Bytes>& bytes);
+    static MemorySlice Wrap(const MemorySegment& segment);
+
+    using SliceComparator = std::function<Result<int32_t>(const MemorySlice&, 
const MemorySlice&)>;
+
+ public:
+    /// Construct a slice over a segment with given offset and length.
+    MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length);
+
+    MemorySlice Slice(int32_t index, int32_t length) const;
+
+    int32_t Length() const;
+    int32_t Offset() const;
+    std::shared_ptr<Bytes> GetOrCreateHeapMemory(MemoryPool* pool) const;
+    const MemorySegment& GetSegment() const;
+
+    /// Returns a raw pointer to the start of this slice's data.
+    inline const char* Data() const {
+        return segment_.Data() + offset_;
+    }
+
+    int8_t ReadByte(int32_t position) const;
+    int32_t ReadInt(int32_t position) const;
+    int16_t ReadShort(int32_t position) const;
+    int64_t ReadLong(int32_t position) const;
+    std::string_view ReadStringView() const;
+
+    std::shared_ptr<Bytes> CopyBytes(MemoryPool* pool) const;
+
+    MemorySliceInput ToInput() const;
+
+ private:
+    MemorySegment segment_;
+    int32_t offset_;
+    int32_t length_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_input.h 
b/src/paimon/common/memory/memory_slice_input.h
new file mode 100644
index 0000000..db6fce0
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_input.h
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "fmt/format.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/common/utils/var_length_int_utils.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+namespace paimon {
+class MemoryPool;
+
+///  Input stream over a MemorySlice with inline hot-path methods.
+class PAIMON_EXPORT MemorySliceInput {
+ public:
+    explicit MemorySliceInput(const MemorySlice& slice)
+        : slice_(slice), data_(slice.Data()), length_(slice.Length()) {}
+
+    inline int32_t Position() const {
+        return position_;
+    }
+
+    inline Status SetPosition(int32_t position) {
+        if (position < 0 || position > length_) {
+            return Status::IndexError(fmt::format("position {} index out of 
bounds", position));
+        }
+        position_ = position;
+        return Status::OK();
+    }
+
+    inline bool IsReadable() const {
+        return position_ < length_;
+    }
+
+    inline int32_t Available() const {
+        return length_ - position_;
+    }
+
+    inline int8_t ReadByte() {
+        int8_t value;
+        std::memcpy(&value, data_ + position_, 1);
+        position_++;
+        return value;
+    }
+
+    inline int8_t ReadUnsignedByte() {
+        return static_cast<int8_t>(static_cast<uint8_t>(data_[position_++]));
+    }
+
+    inline int32_t ReadInt() {
+        int32_t v;
+        std::memcpy(&v, data_ + position_, sizeof(v));
+        position_ += 4;
+        if (NeedSwap()) {
+            return EndianSwapValue(v);
+        }
+        return v;
+    }
+
+    inline int64_t ReadLong() {
+        int64_t v;
+        std::memcpy(&v, data_ + position_, sizeof(v));
+        position_ += 8;
+        if (NeedSwap()) {
+            return EndianSwapValue(v);
+        }
+        return v;
+    }
+
+    /// Reads a varint32 from the current position.
+    inline Result<int32_t> ReadVarLenInt() {
+        return VarLengthIntUtils::DecodeInt(data_, &position_);
+    }
+
+    /// Reads a varint64 from the current position.
+    inline Result<int64_t> ReadVarLenLong() {
+        return VarLengthIntUtils::DecodeLong(data_, &position_);
+    }
+
+    inline MemorySlice ReadSliceView(int32_t length) {
+        auto view_segment = MemorySegment::WrapView(data_ + position_, length);
+        position_ += length;
+        return MemorySlice(view_segment, 0, length);
+    }
+
+    void SetOrder(ByteOrder order) {
+        byte_order_ = order;
+    }
+
+ private:
+    inline bool NeedSwap() const {
+        return SystemByteOrder() != byte_order_;
+    }
+
+ private:
+    MemorySlice slice_;
+    const char* data_;  // Cached raw pointer for fast access.
+    int32_t length_;    // Cached length.
+    int32_t position_ = 0;
+
+    ByteOrder byte_order_ = SystemByteOrder();
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_output.cpp 
b/src/paimon/common/memory/memory_slice_output.cpp
new file mode 100644
index 0000000..87807ad
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_output.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/memory/memory_slice_output.h"
+
+#include "fmt/format.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/common/utils/var_length_int_utils.h"
+namespace paimon {
+
+MemorySliceOutput::MemorySliceOutput(int32_t estimated_size, MemoryPool* pool) 
{
+    size_ = 0;
+    pool_ = pool;
+    segment_ = MemorySegment::Wrap(Bytes::AllocateBytes(estimated_size, pool));
+}
+
+int32_t MemorySliceOutput::Size() const {
+    return size_;
+}
+
+void MemorySliceOutput::Reset() {
+    size_ = 0;
+}
+
+MemorySlice MemorySliceOutput::ToSlice() {
+    return MemorySlice(segment_, 0, size_);
+}
+
+template <typename T>
+void MemorySliceOutput::WriteValue(T value) {
+    int32_t write_length = sizeof(T);
+    EnsureSize(size_ + write_length);
+    T write_value = value;
+    if (NeedSwap()) {
+        write_value = EndianSwapValue(value);
+    }
+    segment_.PutValue(size_, write_value);
+    size_ += write_length;
+}
+
+Status MemorySliceOutput::WriteVarLenInt(int32_t value) {
+    EnsureSize(size_ + VarLengthIntUtils::kMaxVarIntSize);
+    PAIMON_ASSIGN_OR_RAISE(int32_t bytes_written,
+                           VarLengthIntUtils::EncodeInt(value, 
segment_.MutableData() + size_));
+    size_ += bytes_written;
+    return Status::OK();
+}
+
+Status MemorySliceOutput::WriteVarLenLong(int64_t value) {
+    EnsureSize(size_ + VarLengthIntUtils::kMaxVarLongSize);
+    PAIMON_ASSIGN_OR_RAISE(int32_t bytes_written,
+                           VarLengthIntUtils::EncodeLong(value, 
segment_.MutableData() + size_));
+    size_ += bytes_written;
+    return Status::OK();
+}
+
+void MemorySliceOutput::WriteBytes(const std::shared_ptr<Bytes>& source) {
+    WriteBytes(source, 0, source->size());
+}
+
+void MemorySliceOutput::WriteBytes(const std::shared_ptr<Bytes>& source, 
int32_t source_index,
+                                   int32_t length) {
+    EnsureSize(size_ + length);
+    std::string_view sv{source->data(), source->size()};
+    segment_.Put(size_, sv, source_index, length);
+    size_ += length;
+}
+
+void MemorySliceOutput::SetOrder(ByteOrder order) {
+    byte_order_ = order;
+}
+
+bool MemorySliceOutput::NeedSwap() const {
+    return SystemByteOrder() != byte_order_;
+}
+
+void MemorySliceOutput::EnsureSize(int32_t size) {
+    if (size <= segment_.Size()) {
+        return;
+    }
+    int32_t capacity = segment_.Size();
+    int32_t min_capacity = segment_.Size() + size;
+    while (capacity < min_capacity) {
+        // capacity is always a power-of-two and <= INT32_MAX/2 in practice,
+        // so this shift does not overflow.
+        capacity <<= 1;
+    }
+
+    auto bytes = std::make_shared<Bytes>(capacity, pool_);
+    MemorySegment new_segment = MemorySegment::Wrap(bytes);
+
+    segment_.CopyTo(0, &new_segment, 0, segment_.Size());
+    segment_ = new_segment;
+}
+
+template void MemorySliceOutput::WriteValue(bool);
+template void MemorySliceOutput::WriteValue(char);
+template void MemorySliceOutput::WriteValue(int8_t);
+template void MemorySliceOutput::WriteValue(int16_t);
+template void MemorySliceOutput::WriteValue(int32_t);
+template void MemorySliceOutput::WriteValue(int64_t);
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_output.h 
b/src/paimon/common/memory/memory_slice_output.h
new file mode 100644
index 0000000..cbfca98
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_output.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class MemoryPool;
+
+///  Slice of a MemorySegment.
+class PAIMON_EXPORT MemorySliceOutput {
+ public:
+    MemorySliceOutput() = default;
+
+    MemorySliceOutput(int32_t estimated_size, MemoryPool* pool);
+
+    int32_t Size() const;
+    void Reset();
+    MemorySlice ToSlice();
+
+    template <typename T>
+    void WriteValue(T value);
+
+    Status WriteVarLenInt(int32_t value);
+    Status WriteVarLenLong(int64_t value);
+
+    void WriteBytes(const std::shared_ptr<Bytes>& source);
+    void WriteBytes(const std::shared_ptr<Bytes>& source, int32_t 
source_index, int32_t length);
+
+    void SetOrder(ByteOrder order);
+
+ private:
+    void EnsureSize(int32_t bytes);
+    bool NeedSwap() const;
+
+ private:
+    MemoryPool* pool_;
+    MemorySegment segment_;
+    int32_t size_;
+
+    ByteOrder byte_order_ = SystemByteOrder();
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_test.cpp 
b/src/paimon/common/memory/memory_slice_test.cpp
new file mode 100644
index 0000000..2d550f4
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_test.cpp
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/memory/memory_slice.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/memory/memory_slice_output.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class MemorySliceTest : public ::testing::Test {
+ protected:
+    std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
+};
+
+// ==================== MemorySliceOutput Tests ====================
+
+TEST_F(MemorySliceTest, TestOutputWriteValueAndToSlice) {
+    MemorySliceOutput output(64, pool_.get());
+    ASSERT_EQ(0, output.Size());
+
+    output.WriteValue<int32_t>(42);
+    output.WriteValue<int64_t>(123456789LL);
+    output.WriteValue<int16_t>(static_cast<int16_t>(7));
+    output.WriteValue<int8_t>(static_cast<int8_t>(-1));
+    output.WriteValue<bool>(true);
+
+    // 4 + 8 + 2 + 1 + 1 = 16
+    ASSERT_EQ(16, output.Size());
+
+    MemorySlice slice = output.ToSlice();
+    ASSERT_EQ(16, slice.Length());
+    ASSERT_EQ(0, slice.Offset());
+
+    ASSERT_EQ(42, slice.ReadInt(0));
+    ASSERT_EQ(123456789LL, slice.ReadLong(4));
+    ASSERT_EQ(7, slice.ReadShort(12));
+    ASSERT_EQ(-1, slice.ReadByte(14));
+}
+
+TEST_F(MemorySliceTest, TestOutputWriteBytes) {
+    MemorySliceOutput output(16, pool_.get());
+
+    std::string data = "hello world";
+    auto bytes = std::make_shared<Bytes>(data, pool_.get());
+    output.WriteBytes(bytes);
+    ASSERT_EQ(static_cast<int32_t>(data.size()), output.Size());
+
+    MemorySlice slice = output.ToSlice();
+    ASSERT_EQ("hello world", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestOutputWriteBytesSubRange) {
+    MemorySliceOutput output(16, pool_.get());
+
+    std::string data = "hello world";
+    auto bytes = std::make_shared<Bytes>(data, pool_.get());
+    output.WriteBytes(bytes, 6, 5);
+    ASSERT_EQ(5, output.Size());
+
+    MemorySlice slice = output.ToSlice();
+    ASSERT_EQ("world", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestOutputVarLenInt) {
+    MemorySliceOutput output(32, pool_.get());
+
+    ASSERT_OK(output.WriteVarLenInt(0));
+    ASSERT_OK(output.WriteVarLenInt(127));
+    ASSERT_OK(output.WriteVarLenInt(128));
+    ASSERT_OK(output.WriteVarLenInt(16384));
+
+    MemorySlice slice = output.ToSlice();
+    MemorySliceInput input(slice);
+
+    ASSERT_OK_AND_ASSIGN(int32_t v0, input.ReadVarLenInt());
+    ASSERT_EQ(0, v0);
+    ASSERT_OK_AND_ASSIGN(int32_t v127, input.ReadVarLenInt());
+    ASSERT_EQ(127, v127);
+    ASSERT_OK_AND_ASSIGN(int32_t v128, input.ReadVarLenInt());
+    ASSERT_EQ(128, v128);
+    ASSERT_OK_AND_ASSIGN(int32_t v16384, input.ReadVarLenInt());
+    ASSERT_EQ(16384, v16384);
+}
+
+TEST_F(MemorySliceTest, TestOutputVarLenLong) {
+    MemorySliceOutput output(32, pool_.get());
+
+    ASSERT_OK(output.WriteVarLenLong(0));
+    ASSERT_OK(output.WriteVarLenLong(127));
+    ASSERT_OK(output.WriteVarLenLong(128));
+    ASSERT_OK(output.WriteVarLenLong(1234567890123LL));
+
+    MemorySlice slice = output.ToSlice();
+    MemorySliceInput input(slice);
+
+    ASSERT_OK_AND_ASSIGN(int64_t v0, input.ReadVarLenLong());
+    ASSERT_EQ(0, v0);
+    ASSERT_OK_AND_ASSIGN(int64_t v127, input.ReadVarLenLong());
+    ASSERT_EQ(127, v127);
+    ASSERT_OK_AND_ASSIGN(int64_t v128, input.ReadVarLenLong());
+    ASSERT_EQ(128, v128);
+    ASSERT_OK_AND_ASSIGN(int64_t v_large, input.ReadVarLenLong());
+    ASSERT_EQ(1234567890123LL, v_large);
+}
+
+TEST_F(MemorySliceTest, TestOutputVarLenNegativeValue) {
+    MemorySliceOutput output(16, pool_.get());
+
+    ASSERT_NOK_WITH_MSG(output.WriteVarLenInt(-1), "negative value: v=-1");
+    ASSERT_NOK_WITH_MSG(output.WriteVarLenLong(-2), "negative value: v=-2");
+    ASSERT_EQ(0, output.Size());
+}
+
+TEST_F(MemorySliceTest, TestOutputReset) {
+    MemorySliceOutput output(16, pool_.get());
+    output.WriteValue<int32_t>(42);
+    ASSERT_EQ(4, output.Size());
+
+    output.Reset();
+    ASSERT_EQ(0, output.Size());
+
+    output.WriteValue<int64_t>(99);
+    ASSERT_EQ(8, output.Size());
+    MemorySlice slice = output.ToSlice();
+    ASSERT_EQ(99, slice.ReadLong(0));
+}
+
+TEST_F(MemorySliceTest, TestOutputAutoGrow) {
+    // Start with tiny buffer, force multiple grows
+    MemorySliceOutput output(4, pool_.get());
+    for (int32_t i = 0; i < 100; i++) {
+        output.WriteValue<int32_t>(i);
+    }
+    ASSERT_EQ(400, output.Size());
+
+    MemorySlice slice = output.ToSlice();
+    for (int32_t i = 0; i < 100; i++) {
+        ASSERT_EQ(i, slice.ReadInt(i * 4));
+    }
+}
+
+TEST_F(MemorySliceTest, TestOutputByteOrder) {
+    {
+        MemorySliceOutput output(16, pool_.get());
+        output.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+        output.WriteValue<int32_t>(0x01020304);
+        MemorySlice slice = output.ToSlice();
+
+        MemorySliceInput input(slice);
+        input.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+        ASSERT_EQ(0x01020304, input.ReadInt());
+    }
+    {
+        MemorySliceOutput output(16, pool_.get());
+        output.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+        output.WriteValue<int32_t>(0x01020304);
+        MemorySlice slice = output.ToSlice();
+
+        MemorySliceInput input(slice);
+        input.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+        ASSERT_EQ(0x01020304, input.ReadInt());
+    }
+    {
+        MemorySliceOutput output(16, pool_.get());
+        output.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+        output.WriteValue<int64_t>(123456789123456ll);
+        MemorySlice slice = output.ToSlice();
+
+        MemorySliceInput input(slice);
+        input.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+        ASSERT_EQ(123456789123456ll, input.ReadLong());
+    }
+    {
+        MemorySliceOutput output(16, pool_.get());
+        output.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+        output.WriteValue<int64_t>(123456789123456ll);
+        MemorySlice slice = output.ToSlice();
+
+        MemorySliceInput input(slice);
+        input.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+        ASSERT_EQ(123456789123456ll, input.ReadLong());
+    }
+}
+
+// ==================== MemorySlice Tests ====================
+
+TEST_F(MemorySliceTest, TestSliceWrapBytes) {
+    std::string data = "abcdefghij";
+    auto bytes = std::make_shared<Bytes>(data, pool_.get());
+    MemorySlice slice = MemorySlice::Wrap(bytes);
+
+    ASSERT_EQ(10, slice.Length());
+    ASSERT_EQ(0, slice.Offset());
+    ASSERT_EQ("abcdefghij", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestSliceWrapSegment) {
+    auto bytes = std::make_shared<Bytes>("hello", pool_.get());
+    MemorySegment segment = MemorySegment::Wrap(bytes);
+    MemorySlice slice = MemorySlice::Wrap(segment);
+
+    ASSERT_EQ(5, slice.Length());
+    ASSERT_EQ("hello", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestSliceSubSlice) {
+    std::string data = "abcdefghij";
+    auto bytes = std::make_shared<Bytes>(data, pool_.get());
+    MemorySlice slice = MemorySlice::Wrap(bytes);
+
+    MemorySlice sub = slice.Slice(3, 4);
+    ASSERT_EQ(4, sub.Length());
+    ASSERT_EQ(3, sub.Offset());
+    ASSERT_EQ("defg", sub.ReadStringView());
+
+    // Slice with same range returns equivalent slice
+    MemorySlice same = slice.Slice(0, 10);
+    ASSERT_EQ(10, same.Length());
+    ASSERT_EQ("abcdefghij", same.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestSliceReadPrimitives) {
+    MemorySliceOutput output(32, pool_.get());
+    output.WriteValue<int8_t>(static_cast<int8_t>(0x7F));
+    output.WriteValue<int16_t>(static_cast<int16_t>(1234));
+    output.WriteValue<int32_t>(56789);
+    output.WriteValue<int64_t>(9876543210LL);
+
+    MemorySlice slice = output.ToSlice();
+    ASSERT_EQ(0x7F, slice.ReadByte(0));
+    ASSERT_EQ(static_cast<int16_t>(1234), slice.ReadShort(1));
+    ASSERT_EQ(56789, slice.ReadInt(3));
+    ASSERT_EQ(9876543210LL, slice.ReadLong(7));
+}
+
+TEST_F(MemorySliceTest, TestSliceCopyBytes) {
+    std::string data = "copy me";
+    auto bytes = std::make_shared<Bytes>(data, pool_.get());
+
+    MemorySlice slice = MemorySlice::Wrap(bytes);
+    auto copied = slice.CopyBytes(pool_.get());
+    ASSERT_EQ(data.size(), copied->size());
+    ASSERT_EQ(data, std::string(copied->data(), copied->size()));
+    // Verify it's a true copy (different pointer)
+    ASSERT_NE(bytes->data(), copied->data());
+
+    MemorySlice slice2(MemorySegment::Wrap(bytes), /*offset=*/5, /*length=*/2);
+    copied = slice2.CopyBytes(pool_.get());
+    ASSERT_EQ(2, copied->size());
+    ASSERT_EQ("me", std::string(copied->data(), copied->size()));
+    ASSERT_NE(bytes->data(), copied->data());
+}
+
+TEST_F(MemorySliceTest, TestSliceGetOrCreateHeapMemory) {
+    auto bytes = std::make_shared<Bytes>("test", pool_.get());
+    MemorySlice slice = MemorySlice::Wrap(bytes);
+    ASSERT_EQ(bytes, slice.GetOrCreateHeapMemory(pool_.get()));
+}
+
+TEST_F(MemorySliceTest, TestSliceToInput) {
+    MemorySliceOutput output(16, pool_.get());
+    output.WriteValue<int32_t>(42);
+    output.WriteValue<int64_t>(99);
+
+    MemorySlice slice = output.ToSlice();
+    MemorySliceInput input = slice.ToInput();
+
+    ASSERT_EQ(42, input.ReadInt());
+    ASSERT_EQ(99, input.ReadLong());
+}
+
+// ==================== MemorySliceInput Tests ====================
+
+TEST_F(MemorySliceTest, TestInputPositionAndAvailable) {
+    MemorySliceOutput output(16, pool_.get());
+    output.WriteValue<int32_t>(1);
+    output.WriteValue<int32_t>(2);
+
+    MemorySliceInput input(output.ToSlice());
+    ASSERT_EQ(0, input.Position());
+    ASSERT_EQ(8, input.Available());
+    ASSERT_TRUE(input.IsReadable());
+
+    input.ReadInt();
+    ASSERT_EQ(4, input.Position());
+    ASSERT_EQ(4, input.Available());
+    ASSERT_TRUE(input.IsReadable());
+
+    input.ReadInt();
+    ASSERT_EQ(8, input.Position());
+    ASSERT_EQ(0, input.Available());
+    ASSERT_FALSE(input.IsReadable());
+}
+
+TEST_F(MemorySliceTest, TestInputSetPosition) {
+    MemorySliceOutput output(16, pool_.get());
+    output.WriteValue<int32_t>(111);
+    output.WriteValue<int32_t>(222);
+
+    MemorySliceInput input(output.ToSlice());
+    input.ReadInt();
+    ASSERT_EQ(4, input.Position());
+
+    ASSERT_OK(input.SetPosition(0));
+    ASSERT_EQ(0, input.Position());
+    ASSERT_EQ(111, input.ReadInt());
+
+    // Invalid positions
+    ASSERT_NOK_WITH_MSG(input.SetPosition(-1), "position -1 index out of 
bounds");
+    ASSERT_NOK_WITH_MSG(input.SetPosition(100), "position 100 index out of 
bounds");
+}
+
+TEST_F(MemorySliceTest, TestInputReadByte) {
+    MemorySliceOutput output(8, pool_.get());
+    output.WriteValue<int8_t>(static_cast<int8_t>(-128));
+    output.WriteValue<int8_t>(static_cast<int8_t>(127));
+    output.WriteValue<int8_t>(static_cast<int8_t>(0));
+
+    MemorySliceInput input(output.ToSlice());
+    ASSERT_EQ(-128, input.ReadByte());
+    ASSERT_EQ(127, input.ReadByte());
+    ASSERT_EQ(0, input.ReadByte());
+}
+
+TEST_F(MemorySliceTest, TestInputReadUnsignedByte) {
+    MemorySliceOutput output(8, pool_.get());
+    output.WriteValue<int8_t>(static_cast<int8_t>(0xFF));
+
+    MemorySliceInput input(output.ToSlice());
+    // ReadUnsignedByte masks with 0xFF
+    int8_t value = input.ReadUnsignedByte();
+    ASSERT_EQ(static_cast<int8_t>(0xFF & 0xFF), value);
+}
+
+TEST_F(MemorySliceTest, TestInputReadIntAndLong) {
+    MemorySliceOutput output(32, pool_.get());
+    output.WriteValue<int32_t>(0);
+    output.WriteValue<int32_t>(INT32_MAX);
+    output.WriteValue<int32_t>(INT32_MIN);
+    output.WriteValue<int64_t>(0);
+    output.WriteValue<int64_t>(INT64_MAX);
+    output.WriteValue<int64_t>(INT64_MIN);
+
+    MemorySliceInput input(output.ToSlice());
+    ASSERT_EQ(0, input.ReadInt());
+    ASSERT_EQ(INT32_MAX, input.ReadInt());
+    ASSERT_EQ(INT32_MIN, input.ReadInt());
+    ASSERT_EQ(0, input.ReadLong());
+    ASSERT_EQ(INT64_MAX, input.ReadLong());
+    ASSERT_EQ(INT64_MIN, input.ReadLong());
+}
+
+TEST_F(MemorySliceTest, TestInputReadSliceView) {
+    std::string data = "abcdefghij";
+    auto bytes = std::make_shared<Bytes>(data, pool_.get());
+    MemorySlice slice = MemorySlice::Wrap(bytes);
+    MemorySliceInput input(slice);
+
+    MemorySlice sub = input.ReadSliceView(5);
+    ASSERT_EQ("abcde", sub.ReadStringView());
+    ASSERT_EQ(5, input.Position());
+
+    MemorySlice sub2 = input.ReadSliceView(5);
+    ASSERT_EQ("fghij", sub2.ReadStringView());
+    ASSERT_EQ(10, input.Position());
+}
+
+// ==================== Round-trip Output → Input Tests ====================
+
+TEST_F(MemorySliceTest, TestRoundTripMixedTypes) {
+    MemorySliceOutput output(64, pool_.get());
+    output.WriteValue<int8_t>(static_cast<int8_t>(42));
+    output.WriteValue<int16_t>(static_cast<int16_t>(1000));
+    output.WriteValue<int32_t>(123456);
+    output.WriteValue<int64_t>(9876543210LL);
+    ASSERT_OK(output.WriteVarLenInt(300));
+    ASSERT_OK(output.WriteVarLenLong(1000000LL));
+
+    auto bytes = std::make_shared<Bytes>("test", pool_.get());
+    output.WriteValue<int32_t>(static_cast<int32_t>(bytes->size()));
+    output.WriteBytes(bytes);
+
+    MemorySlice slice = output.ToSlice();
+    MemorySliceInput input(slice);
+    ASSERT_EQ(42, input.ReadByte());
+    // int16_t written at position 1, read via slice
+    ASSERT_EQ(1000, slice.ReadShort(1));
+    ASSERT_OK(input.SetPosition(3));
+    ASSERT_EQ(123456, input.ReadInt());
+    ASSERT_OK(input.SetPosition(3));
+    ASSERT_EQ(123456, input.ReadInt());
+    ASSERT_EQ(9876543210LL, input.ReadLong());
+
+    ASSERT_OK_AND_ASSIGN(int32_t var_int, input.ReadVarLenInt());
+    ASSERT_EQ(300, var_int);
+    ASSERT_OK_AND_ASSIGN(int64_t var_long, input.ReadVarLenLong());
+    ASSERT_EQ(1000000LL, var_long);
+
+    int32_t str_len = input.ReadInt();
+    ASSERT_EQ(4, str_len);
+    MemorySlice str_slice = input.ReadSliceView(str_len);
+    ASSERT_EQ("test", str_slice.ReadStringView());
+}
+
+}  // namespace paimon::test

Reply via email to