This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 8cb8c61 feat: add IO cache (LRUCache, CacheManager, CacheInputStream)
and MemorySlice utilities (#40)
8cb8c61 is described below
commit 8cb8c61a66e14607628fe7a8144fa15baa09c603
Author: lxy <[email protected]>
AuthorDate: Tue Jun 2 16:34:44 2026 +0800
feat: add IO cache (LRUCache, CacheManager, CacheInputStream) and
MemorySlice utilities (#40)
---
src/paimon/common/io/cache/cache.h | 81 +++++
src/paimon/common/io/cache/cache_key.cpp | 58 +++
src/paimon/common/io/cache/cache_key.h | 85 +++++
src/paimon/common/io/cache/cache_manager.cpp | 45 +++
src/paimon/common/io/cache/cache_manager.h | 93 +++++
src/paimon/common/io/cache/lru_cache.cpp | 70 ++++
src/paimon/common/io/cache/lru_cache.h | 68 ++++
src/paimon/common/io/cache/lru_cache_test.cpp | 389 ++++++++++++++++++++
src/paimon/common/io/cache_input_stream.h | 92 +++++
src/paimon/common/io/cache_input_stream_test.cpp | 234 +++++++++++++
src/paimon/common/memory/memory_slice.cpp | 89 +++++
src/paimon/common/memory/memory_slice.h | 78 +++++
src/paimon/common/memory/memory_slice_input.h | 128 +++++++
src/paimon/common/memory/memory_slice_output.cpp | 118 +++++++
src/paimon/common/memory/memory_slice_output.h | 69 ++++
src/paimon/common/memory/memory_slice_test.cpp | 429 +++++++++++++++++++++++
16 files changed, 2126 insertions(+)
diff --git a/src/paimon/common/io/cache/cache.h
b/src/paimon/common/io/cache/cache.h
new file mode 100644
index 0000000..d2cfeab
--- /dev/null
+++ b/src/paimon/common/io/cache/cache.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class CacheValue;
+
+/// Callback invoked when a cache entry is evicted by the LRU policy.
+using CacheCallback = std::function<void(const std::shared_ptr<CacheKey>&)>;
+
+class PAIMON_EXPORT Cache {
+ public:
+ virtual ~Cache() = default;
+ virtual Result<std::shared_ptr<CacheValue>> Get(
+ const std::shared_ptr<CacheKey>& key,
+ std::function<Result<std::shared_ptr<CacheValue>>(const
std::shared_ptr<CacheKey>&)>
+ supplier) = 0;
+
+ virtual Status Put(const std::shared_ptr<CacheKey>& key,
+ const std::shared_ptr<CacheValue>& value) = 0;
+
+ virtual void Invalidate(const std::shared_ptr<CacheKey>& key) = 0;
+
+ virtual void InvalidateAll() = 0;
+
+ virtual size_t Size() const = 0;
+};
+
+class CacheValue {
+ public:
+ CacheValue(const MemorySegment& segment, CacheCallback callback)
+ : segment_(segment), callback_(std::move(callback)) {}
+
+ const MemorySegment& GetSegment() const {
+ return segment_;
+ }
+
+ /// Invoke the eviction callback, if one was registered.
+ void OnEvict(const std::shared_ptr<CacheKey>& key) const {
+ if (callback_) {
+ callback_(key);
+ }
+ }
+
+ bool operator==(const CacheValue& other) const {
+ if (this == &other) {
+ return true;
+ }
+ return segment_ == other.segment_;
+ }
+
+ private:
+ MemorySegment segment_;
+ CacheCallback callback_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_key.cpp
b/src/paimon/common/io/cache/cache_key.cpp
new file mode 100644
index 0000000..383bb20
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_key.cpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/cache_key.h"
+
+namespace paimon {
+
+std::shared_ptr<CacheKey> CacheKey::ForPosition(const std::string& file_path,
int64_t position,
+ int32_t length, bool is_index)
{
+ return std::make_shared<PositionCacheKey>(file_path, position, length,
is_index);
+}
+
+bool PositionCacheKey::IsIndex() const {
+ return is_index_;
+}
+
+int64_t PositionCacheKey::Position() const {
+ return position_;
+}
+
+int32_t PositionCacheKey::Length() const {
+ return length_;
+}
+
+bool PositionCacheKey::Equals(const CacheKey& other) const {
+ const auto* rhs = dynamic_cast<const PositionCacheKey*>(&other);
+ if (!rhs) {
+ return false;
+ }
+ return file_path_ == rhs->file_path_ && position_ == rhs->position_ &&
+ length_ == rhs->length_ && is_index_ == rhs->is_index_;
+}
+
+size_t PositionCacheKey::HashCode() const {
+ size_t seed = 0;
+ seed ^= std::hash<std::string>{}(file_path_) + HASH_CONSTANT + (seed << 6)
+ (seed >> 2);
+ seed ^= std::hash<int64_t>{}(position_) + HASH_CONSTANT + (seed << 6) +
(seed >> 2);
+ seed ^= std::hash<int32_t>{}(length_) + HASH_CONSTANT + (seed << 6) +
(seed >> 2);
+ seed ^= std::hash<bool>{}(is_index_) + HASH_CONSTANT + (seed << 6) + (seed
>> 2);
+ return seed;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_key.h
b/src/paimon/common/io/cache/cache_key.h
new file mode 100644
index 0000000..75f8cb8
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_key.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class CacheValue;
+
+class PAIMON_EXPORT CacheKey {
+ public:
+ static std::shared_ptr<CacheKey> ForPosition(const std::string& file_path,
int64_t position,
+ int32_t length, bool
is_index);
+
+ public:
+ virtual ~CacheKey() = default;
+
+ virtual bool IsIndex() const = 0;
+
+ virtual bool Equals(const CacheKey& other) const = 0;
+
+ virtual size_t HashCode() const = 0;
+};
+
+class PositionCacheKey : public CacheKey {
+ public:
+ PositionCacheKey(const std::string& file_path, int64_t position, int32_t
length, bool is_index)
+ : file_path_(file_path), position_(position), length_(length),
is_index_(is_index) {}
+
+ bool IsIndex() const override;
+ size_t HashCode() const override;
+ bool Equals(const CacheKey& other) const override;
+ int64_t Position() const;
+ int32_t Length() const;
+
+ private:
+ static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL;
+
+ const std::string file_path_;
+ const int64_t position_;
+ const int32_t length_;
+ const bool is_index_;
+};
+
+struct CacheKeyHash {
+ size_t operator()(const std::shared_ptr<CacheKey>& key) const {
+ return key ? key->HashCode() : 0;
+ }
+};
+
+struct CacheKeyEqual {
+ bool operator()(const std::shared_ptr<CacheKey>& lhs,
+ const std::shared_ptr<CacheKey>& rhs) const {
+ if (lhs == rhs) {
+ return true;
+ }
+ if (!lhs || !rhs) {
+ return false;
+ }
+ return lhs->Equals(*rhs);
+ }
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_manager.cpp
b/src/paimon/common/io/cache/cache_manager.cpp
new file mode 100644
index 0000000..3e5c049
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_manager.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/cache_manager.h"
+
+namespace paimon {
+
+Result<MemorySegment> CacheManager::GetPage(
+ std::shared_ptr<CacheKey>& key,
+ std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)>
reader,
+ CacheCallback eviction_callback) {
+ auto& cache = key->IsIndex() ? index_cache_ : data_cache_;
+ auto supplier =
+ [&](const std::shared_ptr<CacheKey>& key) ->
Result<std::shared_ptr<CacheValue>> {
+ PAIMON_ASSIGN_OR_RAISE(MemorySegment segment, reader(key));
+ return std::make_shared<CacheValue>(segment,
std::move(eviction_callback));
+ };
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CacheValue> cache_value,
cache->Get(key, supplier));
+ return cache_value->GetSegment();
+}
+
+void CacheManager::InvalidPage(const std::shared_ptr<CacheKey>& key) {
+ if (key->IsIndex()) {
+ index_cache_->Invalidate(key);
+ } else {
+ data_cache_->Invalidate(key);
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/cache_manager.h
b/src/paimon/common/io/cache/cache_manager.h
new file mode 100644
index 0000000..bad3f68
--- /dev/null
+++ b/src/paimon/common/io/cache/cache_manager.h
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+
+#include "paimon/common/io/cache/cache.h"
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/io/cache/lru_cache.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class PAIMON_EXPORT CacheManager {
+ public:
+ /// Refreshing the cache comes with some costs, so not every time we visit
the CacheManager, but
+ /// every 10 visits, refresh the LRU strategy.
+ static constexpr int32_t REFRESH_COUNT = 10;
+
+ /// Container that wraps a MemorySegment with an access counter for
refresh.
+ class SegmentContainer {
+ public:
+ explicit SegmentContainer(const MemorySegment& segment) :
segment_(segment) {}
+
+ const MemorySegment& Access() {
+ access_count_++;
+ return segment_;
+ }
+
+ int32_t GetAccessCount() const {
+ return access_count_;
+ }
+
+ private:
+ MemorySegment segment_;
+ int32_t access_count_ = 0;
+ };
+
+ /// Constructs a CacheManager with LRU caching.
+ /// @param max_memory_bytes Total cache capacity in bytes.
+ /// @param high_priority_pool_ratio Ratio of capacity reserved for index
cache [0.0, 1.0).
+ /// If 0, index and data share the same cache.
+ CacheManager(int64_t max_memory_bytes, double high_priority_pool_ratio) {
+ auto index_cache_bytes = static_cast<int64_t>(max_memory_bytes *
high_priority_pool_ratio);
+ auto data_cache_bytes =
+ static_cast<int64_t>(max_memory_bytes * (1.0 -
high_priority_pool_ratio));
+ data_cache_ = std::make_shared<LruCache>(data_cache_bytes);
+ if (high_priority_pool_ratio == 0.0) {
+ index_cache_ = data_cache_;
+ } else {
+ index_cache_ = std::make_shared<LruCache>(index_cache_bytes);
+ }
+ }
+
+ Result<MemorySegment> GetPage(
+ std::shared_ptr<CacheKey>& key,
+ std::function<Result<MemorySegment>(const std::shared_ptr<CacheKey>&)>
reader,
+ CacheCallback eviction_callback);
+
+ void InvalidPage(const std::shared_ptr<CacheKey>& key);
+
+ const std::shared_ptr<Cache>& DataCache() const {
+ return data_cache_;
+ }
+
+ const std::shared_ptr<Cache>& IndexCache() const {
+ return index_cache_;
+ }
+
+ private:
+ std::shared_ptr<Cache> data_cache_;
+ std::shared_ptr<Cache> index_cache_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/lru_cache.cpp
b/src/paimon/common/io/cache/lru_cache.cpp
new file mode 100644
index 0000000..fbfca83
--- /dev/null
+++ b/src/paimon/common/io/cache/lru_cache.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/lru_cache.h"
+
+namespace paimon {
+
+LruCache::LruCache(int64_t max_weight)
+ : inner_cache_(InnerCache::Options{
+ .max_weight = max_weight,
+ .expire_after_access_ms = -1,
+ .weigh_func = [](const std::shared_ptr<CacheKey>& /*key*/,
+ const std::shared_ptr<CacheValue>& value) ->
int64_t {
+ return value ? value->GetSegment().Size() : 0;
+ },
+ .removal_callback =
+ [](const std::shared_ptr<CacheKey>& key, const
std::shared_ptr<CacheValue>& value,
+ auto cause) {
+ if (value) {
+ value->OnEvict(key);
+ }
+ }}) {}
+
+Result<std::shared_ptr<CacheValue>> LruCache::Get(
+ const std::shared_ptr<CacheKey>& key,
+ std::function<Result<std::shared_ptr<CacheValue>>(const
std::shared_ptr<CacheKey>&)> supplier) {
+ return inner_cache_.Get(key, std::move(supplier));
+}
+
+Status LruCache::Put(const std::shared_ptr<CacheKey>& key,
+ const std::shared_ptr<CacheValue>& value) {
+ return inner_cache_.Put(key, value);
+}
+
+void LruCache::Invalidate(const std::shared_ptr<CacheKey>& key) {
+ inner_cache_.Invalidate(key);
+}
+
+void LruCache::InvalidateAll() {
+ inner_cache_.InvalidateAll();
+}
+
+size_t LruCache::Size() const {
+ return inner_cache_.Size();
+}
+
+int64_t LruCache::GetCurrentWeight() const {
+ return inner_cache_.GetCurrentWeight();
+}
+
+int64_t LruCache::GetMaxWeight() const {
+ return inner_cache_.GetMaxWeight();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/lru_cache.h
b/src/paimon/common/io/cache/lru_cache.h
new file mode 100644
index 0000000..07d6038
--- /dev/null
+++ b/src/paimon/common/io/cache/lru_cache.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/common/io/cache/cache.h"
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/utils/generic_lru_cache.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// LRU Cache implementation with weight-based eviction for block cache.
+///
+/// Wraps GenericLruCache with CacheKey/CacheValue types. Capacity is measured
+/// in bytes (sum of MemorySegment sizes). When an entry is evicted, its
+/// CacheCallback is invoked to notify the upper layer.
+///
+/// @note Thread-safe: all public methods are protected by the underlying
GenericLruCache lock.
+class PAIMON_EXPORT LruCache : public Cache {
+ public:
+ explicit LruCache(int64_t max_weight);
+
+ Result<std::shared_ptr<CacheValue>> Get(
+ const std::shared_ptr<CacheKey>& key,
+ std::function<Result<std::shared_ptr<CacheValue>>(const
std::shared_ptr<CacheKey>&)>
+ supplier) override;
+
+ Status Put(const std::shared_ptr<CacheKey>& key,
+ const std::shared_ptr<CacheValue>& value) override;
+
+ void Invalidate(const std::shared_ptr<CacheKey>& key) override;
+
+ void InvalidateAll() override;
+
+ size_t Size() const override;
+
+ int64_t GetCurrentWeight() const;
+
+ int64_t GetMaxWeight() const;
+
+ private:
+ using InnerCache = GenericLruCache<std::shared_ptr<CacheKey>,
std::shared_ptr<CacheValue>,
+ CacheKeyHash, CacheKeyEqual>;
+
+ InnerCache inner_cache_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache/lru_cache_test.cpp
b/src/paimon/common/io/cache/lru_cache_test.cpp
new file mode 100644
index 0000000..9e8ae44
--- /dev/null
+++ b/src/paimon/common/io/cache/lru_cache_test.cpp
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache/lru_cache.h"
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/io/cache/cache.h"
+#include "paimon/common/io/cache/cache_key.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class LruCacheTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ }
+
+ std::shared_ptr<CacheKey> MakeKey(int64_t position, bool is_index = false)
const {
+ return CacheKey::ForPosition("test_file", position, 64, is_index);
+ }
+
+ std::shared_ptr<CacheValue> MakeValue(int32_t size, char fill_byte = 0,
+ CacheCallback callback = {}) const {
+ auto segment = MemorySegment::AllocateHeapMemory(size, pool_.get());
+ std::memset(segment.MutableData(), fill_byte, size);
+ return std::make_shared<CacheValue>(segment, std::move(callback));
+ }
+
+ auto MakeSupplier(int32_t size, char fill_byte = 0, CacheCallback callback
= {}) const {
+ return [this, size, fill_byte, callback = std::move(callback)](
+ const std::shared_ptr<CacheKey>&) ->
Result<std::shared_ptr<CacheValue>> {
+ return MakeValue(size, fill_byte, callback);
+ };
+ }
+
+ private:
+ std::shared_ptr<MemoryPool> pool_;
+};
+
+/// Verifies basic Get with supplier: cache miss loads via supplier, cache hit
returns existing.
+TEST_F(LruCacheTest, TestGetCacheHitAndMiss) {
+ LruCache cache(1024);
+
+ auto key = MakeKey(0);
+ int32_t supplier_call_count = 0;
+ auto supplier = [&](const std::shared_ptr<CacheKey>&) ->
Result<std::shared_ptr<CacheValue>> {
+ supplier_call_count++;
+ return MakeValue(64, 'A');
+ };
+
+ // First Get: cache miss, supplier should be called
+ ASSERT_OK_AND_ASSIGN(auto value1, cache.Get(key, supplier));
+ ASSERT_EQ(supplier_call_count, 1);
+ ASSERT_EQ(value1->GetSegment().Size(), 64);
+ ASSERT_EQ(value1->GetSegment().Get(0), 'A');
+ ASSERT_EQ(cache.Size(), 1);
+
+ // Second Get with same key: cache hit, supplier should NOT be called
+ ASSERT_OK_AND_ASSIGN(auto value2, cache.Get(key, supplier));
+ ASSERT_EQ(supplier_call_count, 1);
+ ASSERT_EQ(value2->GetSegment().Get(0), 'A');
+ ASSERT_EQ(cache.Size(), 1);
+ ASSERT_EQ(cache.GetMaxWeight(), 1024);
+}
+
+/// Verifies Put inserts new entries and updates existing ones.
+TEST_F(LruCacheTest, TestPutInsertAndUpdate) {
+ LruCache cache(1024);
+
+ auto key = MakeKey(0);
+ auto value_a = MakeValue(64, 'A');
+ auto value_b = MakeValue(128, 'B');
+
+ // Insert
+ ASSERT_OK(cache.Put(key, value_a));
+ ASSERT_EQ(cache.Size(), 1);
+ ASSERT_EQ(cache.GetCurrentWeight(), 64);
+
+ // Update with larger value
+ ASSERT_OK(cache.Put(key, value_b));
+ ASSERT_EQ(cache.Size(), 1);
+ ASSERT_EQ(cache.GetCurrentWeight(), 128);
+
+ // Verify the updated value is returned
+ ASSERT_OK_AND_ASSIGN(auto result, cache.Get(key, MakeSupplier(0)));
+ ASSERT_EQ(result->GetSegment().Size(), 128);
+ ASSERT_EQ(result->GetSegment().Get(0), 'B');
+}
+
+/// Verifies weight-based eviction: when total weight exceeds max, LRU entries
are evicted.
+TEST_F(LruCacheTest, TestWeightBasedEviction) {
+ // Cache can hold at most 200 bytes
+ LruCache cache(200);
+
+ auto key0 = MakeKey(0);
+ auto key1 = MakeKey(1);
+ auto key2 = MakeKey(2);
+
+ // Insert 2 entries of 100 bytes each (total 200, at capacity)
+ ASSERT_OK(cache.Put(key0, MakeValue(100, 'A')));
+ ASSERT_OK(cache.Put(key1, MakeValue(100, 'B')));
+ ASSERT_EQ(cache.Size(), 2);
+ ASSERT_EQ(cache.GetCurrentWeight(), 200);
+
+ // Insert a 3rd entry: should evict key0 (LRU, inserted first)
+ ASSERT_OK(cache.Put(key2, MakeValue(100, 'C')));
+ ASSERT_EQ(cache.Size(), 2);
+ ASSERT_EQ(cache.GetCurrentWeight(), 200);
+
+ // key0 should be evicted, Get should call supplier
+ int32_t supplier_called = 0;
+ auto supplier = [&](const std::shared_ptr<CacheKey>&) ->
Result<std::shared_ptr<CacheValue>> {
+ supplier_called++;
+ return MakeValue(100, 'X');
+ };
+ ASSERT_OK_AND_ASSIGN(auto result, cache.Get(key0, supplier));
+ ASSERT_EQ(supplier_called, 1);
+ ASSERT_EQ(result->GetSegment().Get(0), 'X');
+}
+
+/// Verifies that eviction invokes the CacheCallback on evicted entries.
+TEST_F(LruCacheTest, TestEvictionCallback) {
+ LruCache cache(200);
+
+ std::vector<int64_t> evicted_positions;
+ auto make_callback = [&evicted_positions](int64_t position) ->
CacheCallback {
+ return [&evicted_positions, position](const
std::shared_ptr<CacheKey>&) {
+ evicted_positions.push_back(position);
+ };
+ };
+
+ auto key0 = MakeKey(0);
+ auto key1 = MakeKey(1);
+ auto key2 = MakeKey(2);
+
+ ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', make_callback(0))));
+ ASSERT_OK(cache.Put(key1, MakeValue(100, 'B', make_callback(1))));
+ ASSERT_TRUE(evicted_positions.empty());
+
+ // Insert key2: should evict key0 and trigger its callback
+ ASSERT_OK(cache.Put(key2, MakeValue(100, 'C', make_callback(2))));
+ ASSERT_EQ(evicted_positions.size(), 1);
+ ASSERT_EQ(evicted_positions[0], 0);
+}
+
+/// Verifies LRU ordering: accessing an entry moves it to the front,
preventing eviction.
+TEST_F(LruCacheTest, TestLruOrdering) {
+ LruCache cache(200);
+
+ auto key0 = MakeKey(0);
+ auto key1 = MakeKey(1);
+ auto key2 = MakeKey(2);
+
+ ASSERT_OK(cache.Put(key0, MakeValue(100, 'A')));
+ ASSERT_OK(cache.Put(key1, MakeValue(100, 'B')));
+
+ // Access key0 via Get to move it to front (most recently used)
+ ASSERT_OK_AND_ASSIGN(auto val, cache.Get(key0, MakeSupplier(0)));
+ ASSERT_EQ(val->GetSegment().Get(0), 'A');
+
+ // Insert key2: should evict key1 (now LRU), NOT key0
+ std::vector<int64_t> evicted;
+ auto callback0 = [&evicted](const std::shared_ptr<CacheKey>&) {
evicted.push_back(0); };
+ auto callback1 = [&evicted](const std::shared_ptr<CacheKey>&) {
evicted.push_back(1); };
+
+ // Re-insert with callbacks to track eviction
+ cache.InvalidateAll();
+ ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', callback0)));
+ ASSERT_OK(cache.Put(key1, MakeValue(100, 'B', callback1)));
+
+ // Access key0 to move it to front
+ ASSERT_OK_AND_ASSIGN(val, cache.Get(key0, MakeSupplier(0)));
+
+ // Insert key2: key1 should be evicted (it's at the back)
+ ASSERT_OK(cache.Put(key2, MakeValue(100, 'C')));
+ ASSERT_EQ(evicted.size(), 1);
+ ASSERT_EQ(evicted[0], 1);
+ ASSERT_EQ(cache.Size(), 2);
+}
+
+/// Verifies Invalidate removes a specific entry and adjusts weight.
+TEST_F(LruCacheTest, TestInvalidate) {
+ LruCache cache(1024);
+
+ auto key0 = MakeKey(0);
+ auto key1 = MakeKey(1);
+
+ std::vector<int64_t> evicted;
+ auto callback0 = [&evicted](const std::shared_ptr<CacheKey>&) {
evicted.push_back(0); };
+ auto callback1 = [&evicted](const std::shared_ptr<CacheKey>&) {
evicted.push_back(1); };
+ ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', callback0)));
+ ASSERT_OK(cache.Put(key1, MakeValue(200, 'B', callback1)));
+ ASSERT_EQ(cache.Size(), 2);
+ ASSERT_EQ(cache.GetCurrentWeight(), 300);
+ ASSERT_TRUE(evicted.empty());
+
+ // Invalidate key0
+ cache.Invalidate(key0);
+ ASSERT_EQ(cache.Size(), 1);
+ ASSERT_EQ(cache.GetCurrentWeight(), 200);
+ ASSERT_EQ(evicted, std::vector<int64_t>({0}));
+
+ // Invalidating a non-existent key is a no-op
+ cache.Invalidate(MakeKey(999));
+ ASSERT_EQ(cache.Size(), 1);
+ ASSERT_EQ(cache.GetCurrentWeight(), 200);
+ ASSERT_EQ(evicted, std::vector<int64_t>({0}));
+}
+
+/// Verifies InvalidateAll clears all entries and resets weight.
+TEST_F(LruCacheTest, TestInvalidateAll) {
+ LruCache cache(1024);
+
+ std::vector<int64_t> evicted;
+ for (int32_t i = 0; i < 5; i++) {
+ auto callback = [&evicted, id = i](const std::shared_ptr<CacheKey>&) {
+ evicted.push_back(id);
+ };
+ ASSERT_OK(cache.Put(MakeKey(i), MakeValue(50, 'A', callback)));
+ }
+ ASSERT_EQ(cache.Size(), 5);
+ ASSERT_EQ(cache.GetCurrentWeight(), 250);
+ ASSERT_TRUE(evicted.empty());
+
+ cache.InvalidateAll();
+ ASSERT_EQ(cache.Size(), 0);
+ ASSERT_EQ(cache.GetCurrentWeight(), 0);
+ ASSERT_EQ(evicted, std::vector<int64_t>({0, 1, 2, 3, 4}));
+}
+
+/// Verifies weight tracking is accurate across Put, Update, Invalidate, and
Eviction.
+TEST_F(LruCacheTest, TestWeightTracking) {
+ LruCache cache(500);
+
+ auto key0 = MakeKey(0);
+ auto key1 = MakeKey(1);
+
+ // Put 100 bytes
+ ASSERT_OK(cache.Put(key0, MakeValue(100)));
+ ASSERT_EQ(cache.GetCurrentWeight(), 100);
+
+ // Put 200 bytes
+ ASSERT_OK(cache.Put(key1, MakeValue(200)));
+ ASSERT_EQ(cache.GetCurrentWeight(), 300);
+
+ // Update key0 from 100 to 150 bytes
+ ASSERT_OK(cache.Put(key0, MakeValue(150)));
+ ASSERT_EQ(cache.GetCurrentWeight(), 350);
+
+ // Invalidate key1 (200 bytes)
+ cache.Invalidate(key1);
+ ASSERT_EQ(cache.GetCurrentWeight(), 150);
+
+ // Put 200 bytes
+ ASSERT_OK(cache.Put(MakeKey(2), MakeValue(200)));
+ ASSERT_EQ(cache.GetCurrentWeight(), 350);
+
+ // Add: total would be 550 > 500, should evict key0 (150 bytes)
+ ASSERT_OK(cache.Put(MakeKey(3), MakeValue(200)));
+ ASSERT_EQ(cache.GetCurrentWeight(), 400);
+ ASSERT_EQ(cache.Size(), 2);
+}
+
+/// Verifies that Get via supplier correctly handles the case where the
supplier returns an error.
+TEST_F(LruCacheTest, TestSupplierError) {
+ LruCache cache(1024);
+
+ auto key = MakeKey(0);
+ auto error_supplier =
+ [](const std::shared_ptr<CacheKey>&) ->
Result<std::shared_ptr<CacheValue>> {
+ return Status::IOError("simulated read failure");
+ };
+
+ // Get should propagate the supplier error
+ ASSERT_NOK_WITH_MSG(cache.Get(key, error_supplier), "simulated read
failure");
+
+ // Cache should remain empty after failed supplier
+ ASSERT_EQ(cache.Size(), 0);
+ ASSERT_EQ(cache.GetCurrentWeight(), 0);
+}
+
+/// Verifies thread safety: concurrent Get/Put operations should not corrupt
internal state.
+TEST_F(LruCacheTest, TestConcurrentAccess) {
+ LruCache cache(10000);
+ constexpr int32_t num_threads = 8;
+ constexpr int32_t ops_per_thread = 100;
+
+ std::atomic<int32_t> supplier_calls{0};
+ auto supplier = [&](const std::shared_ptr<CacheKey>&) ->
Result<std::shared_ptr<CacheValue>> {
+ supplier_calls++;
+ return MakeValue(10, 'X');
+ };
+
+ std::vector<std::thread> threads;
+ for (int32_t t = 0; t < num_threads; t++) {
+ threads.emplace_back([&, t]() {
+ for (int32_t i = 0; i < ops_per_thread; i++) {
+ auto key = MakeKey(t * ops_per_thread + i);
+ auto result = cache.Get(key, supplier);
+ ASSERT_TRUE(result.ok());
+ }
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ // All entries should be in cache (capacity is large enough)
+ ASSERT_EQ(cache.Size(), num_threads * ops_per_thread);
+ ASSERT_EQ(supplier_calls.load(), num_threads * ops_per_thread);
+}
+
+/// Verifies that Put moves an existing entry to the front of the LRU list.
+TEST_F(LruCacheTest, TestPutMovesToFront) {
+ LruCache cache(200);
+
+ auto key0 = MakeKey(0);
+ auto key1 = MakeKey(1);
+ auto key2 = MakeKey(2);
+
+ std::vector<int64_t> evicted;
+ auto make_callback = [&evicted](int64_t pos) -> CacheCallback {
+ return [&evicted, pos](const std::shared_ptr<CacheKey>&) {
evicted.push_back(pos); };
+ };
+
+ ASSERT_OK(cache.Put(key0, MakeValue(100, 'A', make_callback(0))));
+ ASSERT_OK(cache.Put(key1, MakeValue(100, 'B', make_callback(1))));
+
+ // Get key0 (should move it to front)
+ ASSERT_OK(cache.Get(key0, /*supplier=*/nullptr));
+
+ // Insert key2: should evict key1 (now at back), not key0
+ ASSERT_OK(cache.Put(key2, MakeValue(100, 'C', make_callback(2))));
+ ASSERT_EQ(evicted.size(), 1);
+ ASSERT_EQ(evicted[0], 1);
+}
+
+/// Verifies that multiple evictions happen when a single large entry is
inserted.
+TEST_F(LruCacheTest, TestMultipleEvictions) {
+ LruCache cache(300);
+
+ std::vector<int64_t> evicted;
+ auto make_callback = [&evicted](int64_t pos) -> CacheCallback {
+ return [&evicted, pos](const std::shared_ptr<CacheKey>&) {
evicted.push_back(pos); };
+ };
+
+ // Insert 3 entries of 100 bytes each
+ ASSERT_OK(cache.Put(MakeKey(0), MakeValue(100, 'A', make_callback(0))));
+ ASSERT_OK(cache.Put(MakeKey(1), MakeValue(100, 'B', make_callback(1))));
+ ASSERT_OK(cache.Put(MakeKey(2), MakeValue(100, 'C', make_callback(2))));
+ ASSERT_EQ(cache.Size(), 3);
+ ASSERT_EQ(cache.GetCurrentWeight(), 300);
+
+ // Insert a 250-byte entry: should evict key0, key1 and key2.
+ ASSERT_OK(cache.Put(MakeKey(3), MakeValue(250, 'D')));
+ ASSERT_EQ(cache.Size(), 1);
+ ASSERT_EQ(cache.GetCurrentWeight(), 250);
+
+ ASSERT_EQ(evicted, std::vector<int64_t>({0, 1, 2}));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/io/cache_input_stream.h
b/src/paimon/common/io/cache_input_stream.h
new file mode 100644
index 0000000..3c48df3
--- /dev/null
+++ b/src/paimon/common/io/cache_input_stream.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstring>
+#include <memory>
+#include <string>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon {
+
+class CacheInputStream : public InputStream {
+ public:
+ CacheInputStream(std::unique_ptr<InputStream> input_stream,
+ const std::shared_ptr<ReadAheadCache>& cache)
+ : cache_(cache), input_stream_(std::move(input_stream)) {}
+
+ Status Seek(int64_t offset, SeekOrigin origin) override {
+ return input_stream_->Seek(offset, origin);
+ }
+ Result<int64_t> GetPos() const override {
+ return input_stream_->GetPos();
+ }
+ Result<int32_t> Read(char* buffer, uint32_t size) override {
+ return input_stream_->Read(buffer, size);
+ }
+ Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
override {
+ if (cache_) {
+ ByteRange range{offset, static_cast<uint64_t>(size)};
+ PAIMON_ASSIGN_OR_RAISE(ByteSlice slice, cache_->Read(range));
+ if (slice.buffer) {
+ std::memcpy(buffer, slice.buffer->data() + slice.offset,
slice.length);
+ return slice.length;
+ }
+ }
+ return input_stream_->Read(buffer, size, offset);
+ }
+ void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) override {
+ if (cache_) {
+ ByteRange range{offset, static_cast<uint64_t>(size)};
+ Result<ByteSlice> slice = cache_->Read(range);
+ if (!slice.ok()) {
+ callback(slice.status());
+ return;
+ }
+ if (slice.value().buffer) {
+ std::memcpy(buffer, slice.value().buffer->data() +
slice.value().offset,
+ slice.value().length);
+ callback(Status::OK());
+ return;
+ }
+ }
+ return input_stream_->ReadAsync(buffer, size, offset,
std::move(callback));
+ }
+
+ Status Close() override {
+ return input_stream_->Close();
+ }
+
+ Result<std::string> GetUri() const override {
+ return input_stream_->GetUri();
+ }
+
+ Result<uint64_t> Length() const override {
+ return input_stream_->Length();
+ }
+
+ private:
+ std::shared_ptr<ReadAheadCache> cache_;
+ std::unique_ptr<InputStream> input_stream_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/io/cache_input_stream_test.cpp
b/src/paimon/common/io/cache_input_stream_test.cpp
new file mode 100644
index 0000000..2c2e36c
--- /dev/null
+++ b/src/paimon/common/io/cache_input_stream_test.cpp
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/io/cache_input_stream.h"
+
+#include <fstream>
+#include <functional>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/factories/io_hook.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/io_exception_helper.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace paimon::test {
+
+class CacheInputStreamTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ pool_ = GetDefaultPool();
+ test_dir_ = UniqueTestDirectory::Create();
+ ASSERT_TRUE(test_dir_);
+ content_ = "abcdefghijklmnopqrstuvwxyz0123456789";
+ file_path_ = test_dir_->Str() + "/test_data";
+ std::ofstream file(file_path_, std::ios::binary);
+ ASSERT_TRUE(file.is_open());
+ file.write(content_.data(), content_.size());
+ file.close();
+ }
+
+ std::unique_ptr<InputStream> OpenFile() const {
+ EXPECT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
+ FileSystemFactory::Get("local", file_path_, {}));
+ EXPECT_OK_AND_ASSIGN(std::unique_ptr<InputStream> in,
fs->Open(file_path_));
+ return in;
+ }
+
+ std::shared_ptr<ReadAheadCache> CreateCache(std::vector<ByteRange> ranges)
{
+ auto stream = OpenFile();
+ CacheConfig config(/*buffer_size_limit=*/1024 * 1024,
/*range_size_limit=*/1024,
+ /*hole_size_limit=*/0, /*pre_buffer_limit=*/1024 *
1024);
+ auto cache = std::make_shared<ReadAheadCache>(std::move(stream),
config, pool_);
+ EXPECT_OK(cache->Init(std::move(ranges)));
+ return cache;
+ }
+
+ protected:
+ std::shared_ptr<MemoryPool> pool_;
+ std::unique_ptr<UniqueTestDirectory> test_dir_;
+ std::string content_;
+ std::string file_path_;
+};
+
+// Test proxy methods: Seek, GetPos, Read (sequential), Close, GetUri, Length
+TEST_F(CacheInputStreamTest, TestProxyMethods) {
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), /*cache=*/nullptr);
+
+ // Length
+ ASSERT_OK_AND_ASSIGN(uint64_t length, stream.Length());
+ ASSERT_EQ(length, content_.size());
+
+ // GetUri
+ ASSERT_OK_AND_ASSIGN(std::string uri, stream.GetUri());
+ ASSERT_FALSE(uri.empty());
+
+ // Seek + GetPos
+ ASSERT_OK(stream.Seek(5, SeekOrigin::FS_SEEK_SET));
+ ASSERT_OK_AND_ASSIGN(int64_t pos, stream.GetPos());
+ ASSERT_EQ(pos, 5);
+
+ // Read (sequential, no offset)
+ std::string buffer(3, '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 3));
+ ASSERT_EQ(bytes_read, 3);
+ ASSERT_EQ(buffer, "fgh");
+
+ // Close
+ ASSERT_OK(stream.Close());
+}
+
+// Test Read(offset) with cache == nullptr → direct fallback to input_stream
+TEST_F(CacheInputStreamTest, TestReadWithOffsetNullCache) {
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), /*cache=*/nullptr);
+
+ std::string buffer(5, '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 5,
/*offset=*/2));
+ ASSERT_EQ(bytes_read, 5);
+ ASSERT_EQ(buffer, "cdefg");
+}
+
+// Test Read(offset) with cache hit → memcpy from cache
+TEST_F(CacheInputStreamTest, TestReadWithOffsetCacheHit) {
+ // Cache range [2, 5) i.e. offset=2, length=5
+ auto cache = CreateCache({{2, 5}});
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), cache);
+
+ std::string buffer(5, '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 5,
/*offset=*/2));
+ ASSERT_EQ(bytes_read, 5);
+ ASSERT_EQ(buffer, "cdefg");
+}
+
+// Test Read(offset) with cache miss → fallback to input_stream
+TEST_F(CacheInputStreamTest, TestReadWithOffsetCacheMiss) {
+ // Cache range [2, 5) but read from offset 10 which is not cached
+ auto cache = CreateCache({{2, 5}});
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), cache);
+
+ std::string buffer(3, '\0');
+ ASSERT_OK_AND_ASSIGN(int32_t bytes_read, stream.Read(buffer.data(), 3,
/*offset=*/10));
+ ASSERT_EQ(bytes_read, 3);
+ ASSERT_EQ(buffer, "klm");
+}
+
+// Test ReadAsync with cache == nullptr → direct fallback to
input_stream->ReadAsync
+TEST_F(CacheInputStreamTest, TestReadAsyncNullCache) {
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), /*cache=*/nullptr);
+
+ std::string buffer(4, '\0');
+ bool callback_called = false;
+ Status callback_status = Status::Invalid("not called");
+ stream.ReadAsync(buffer.data(), 4, /*offset=*/0, [&](Status status) {
+ callback_called = true;
+ callback_status = status;
+ });
+ ASSERT_TRUE(callback_called);
+ ASSERT_OK(callback_status);
+ ASSERT_EQ(buffer, "abcd");
+}
+
+// Test ReadAsync with cache hit → memcpy + callback(OK)
+TEST_F(CacheInputStreamTest, TestReadAsyncCacheHit) {
+ auto cache = CreateCache({{0, 10}});
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), cache);
+
+ std::string buffer(4, '\0');
+ bool callback_called = false;
+ Status callback_status = Status::Invalid("not called");
+ stream.ReadAsync(buffer.data(), 4, /*offset=*/3, [&](Status status) {
+ callback_called = true;
+ callback_status = status;
+ });
+ ASSERT_TRUE(callback_called);
+ ASSERT_OK(callback_status);
+ ASSERT_EQ(buffer, "defg");
+}
+
+// Test ReadAsync with cache miss → fallback to input_stream->ReadAsync
+TEST_F(CacheInputStreamTest, TestReadAsyncCacheMiss) {
+ // Cache range [0, 5) but read from offset 20 which is not cached
+ auto cache = CreateCache({{0, 5}});
+ auto underlying = OpenFile();
+ CacheInputStream stream(std::move(underlying), cache);
+
+ std::string buffer(4, '\0');
+ bool callback_called = false;
+ Status callback_status = Status::Invalid("not called");
+ stream.ReadAsync(buffer.data(), 4, /*offset=*/20, [&](Status status) {
+ callback_called = true;
+ callback_status = status;
+ });
+ ASSERT_TRUE(callback_called);
+ ASSERT_OK(callback_status);
+ ASSERT_EQ(buffer, "uvwx");
+}
+
+// Test ReadAsync when cache_->Read() returns error status.
+// Uses IOHook to inject IO error during ReadAheadCache's prefetch, causing
cache_->Read() to fail.
+TEST_F(CacheInputStreamTest, TestReadAsyncCacheReadError) {
+ auto io_hook = paimon::IOHook::GetInstance();
+ bool error_triggered = false;
+
+ for (size_t i = 0; i < 20; i++) {
+ // Open the cache stream and underlying stream BEFORE activating IOHook
+ ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local",
file_path_, {}));
+ ASSERT_OK_AND_ASSIGN(auto cache_stream, fs->Open(file_path_));
+ ASSERT_OK_AND_ASSIGN(auto underlying, fs->Open(file_path_));
+ CacheConfig config(/*buffer_size_limit=*/1024 * 1024,
/*range_size_limit=*/1024,
+ /*hole_size_limit=*/0, /*pre_buffer_limit=*/1024 *
1024);
+ auto cache = std::make_shared<ReadAheadCache>(std::move(cache_stream),
config, pool_);
+ ASSERT_OK(cache->Init(std::vector<ByteRange>{{0, 10}}));
+
+ // Now activate IOHook so that the prefetch IO (triggered by
cache_->Read -> PreBuffer)
+ // will fail at the i-th IO operation
+ paimon::ScopeGuard guard([&io_hook]() { io_hook->Clear(); });
+ io_hook->Reset(i, paimon::IOHook::Mode::RETURN_ERROR);
+ CacheInputStream stream(std::move(underlying), cache);
+
+ std::string buffer(5, '\0');
+ bool callback_called = false;
+ Status callback_status = Status::OK();
+ stream.ReadAsync(buffer.data(), 5, /*offset=*/0, [&](Status status) {
+ callback_called = true;
+ callback_status = status;
+ });
+ ASSERT_TRUE(callback_called);
+ CHECK_HOOK_STATUS(callback_status, i);
+ ASSERT_EQ(buffer, "abcde");
+ error_triggered = true;
+ break;
+ }
+ ASSERT_TRUE(error_triggered);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/memory/memory_slice.cpp
b/src/paimon/common/memory/memory_slice.cpp
new file mode 100644
index 0000000..903652b
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice.cpp
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/memory/memory_slice.h"
+
+#include "paimon/common/memory/memory_slice_input.h"
+
+namespace paimon {
+MemorySlice MemorySlice::Wrap(const std::shared_ptr<Bytes>& bytes) {
+ auto segment = MemorySegment::Wrap(bytes);
+ return MemorySlice(segment, 0, segment.Size());
+}
+
+MemorySlice MemorySlice::Wrap(const MemorySegment& segment) {
+ return MemorySlice(segment, 0, segment.Size());
+}
+
+MemorySlice::MemorySlice(const MemorySegment& segment, int32_t offset, int32_t
length)
+ : segment_(segment), offset_(offset), length_(length) {}
+
+MemorySlice MemorySlice::Slice(int32_t index, int32_t length) const {
+ if (index == 0 && length == length_) {
+ return *this;
+ }
+ return MemorySlice(segment_, offset_ + index, length);
+}
+
+int32_t MemorySlice::Length() const {
+ return length_;
+}
+
+int32_t MemorySlice::Offset() const {
+ return offset_;
+}
+
+std::shared_ptr<Bytes> MemorySlice::GetOrCreateHeapMemory(MemoryPool* pool)
const {
+ return segment_.GetOrCreateHeapMemory(pool);
+}
+
+const MemorySegment& MemorySlice::GetSegment() const {
+ return segment_;
+}
+
+int8_t MemorySlice::ReadByte(int32_t position) const {
+ return segment_.GetValue<int8_t>(offset_ + position);
+}
+
+int32_t MemorySlice::ReadInt(int32_t position) const {
+ return segment_.GetValue<int32_t>(offset_ + position);
+}
+
+int16_t MemorySlice::ReadShort(int32_t position) const {
+ return segment_.GetValue<int16_t>(offset_ + position);
+}
+
+int64_t MemorySlice::ReadLong(int32_t position) const {
+ return segment_.GetValue<int64_t>(offset_ + position);
+}
+
+std::string_view MemorySlice::ReadStringView() const {
+ return {Data(), static_cast<size_t>(length_)};
+}
+
+std::shared_ptr<Bytes> MemorySlice::CopyBytes(MemoryPool* pool) const {
+ auto bytes = std::make_shared<Bytes>(length_, pool);
+ std::memcpy(const_cast<char*>(bytes->data()), Data(), length_);
+ return bytes;
+}
+
+MemorySliceInput MemorySlice::ToInput() const {
+ return MemorySliceInput(*this);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice.h
b/src/paimon/common/memory/memory_slice.h
new file mode 100644
index 0000000..2f45683
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+namespace paimon {
+class MemoryPool;
+class MemorySliceInput;
+
+/// Slice of a MemorySegment.
+///
+/// Represents a sub-range [offset_, offset_ + length_) of a MemorySegment.
+/// The MemorySegment may be owning (with shared_ptr<Bytes>) or non-owning
(view).
+class PAIMON_EXPORT MemorySlice {
+ public:
+ static MemorySlice Wrap(const std::shared_ptr<Bytes>& bytes);
+ static MemorySlice Wrap(const MemorySegment& segment);
+
+ using SliceComparator = std::function<Result<int32_t>(const MemorySlice&,
const MemorySlice&)>;
+
+ public:
+ /// Construct a slice over a segment with given offset and length.
+ MemorySlice(const MemorySegment& segment, int32_t offset, int32_t length);
+
+ MemorySlice Slice(int32_t index, int32_t length) const;
+
+ int32_t Length() const;
+ int32_t Offset() const;
+ std::shared_ptr<Bytes> GetOrCreateHeapMemory(MemoryPool* pool) const;
+ const MemorySegment& GetSegment() const;
+
+ /// Returns a raw pointer to the start of this slice's data.
+ inline const char* Data() const {
+ return segment_.Data() + offset_;
+ }
+
+ int8_t ReadByte(int32_t position) const;
+ int32_t ReadInt(int32_t position) const;
+ int16_t ReadShort(int32_t position) const;
+ int64_t ReadLong(int32_t position) const;
+ std::string_view ReadStringView() const;
+
+ std::shared_ptr<Bytes> CopyBytes(MemoryPool* pool) const;
+
+ MemorySliceInput ToInput() const;
+
+ private:
+ MemorySegment segment_;
+ int32_t offset_;
+ int32_t length_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_input.h
b/src/paimon/common/memory/memory_slice_input.h
new file mode 100644
index 0000000..db6fce0
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_input.h
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "fmt/format.h"
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/common/utils/var_length_int_utils.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+namespace paimon {
+class MemoryPool;
+
+/// Input stream over a MemorySlice with inline hot-path methods.
+class PAIMON_EXPORT MemorySliceInput {
+ public:
+ explicit MemorySliceInput(const MemorySlice& slice)
+ : slice_(slice), data_(slice.Data()), length_(slice.Length()) {}
+
+ inline int32_t Position() const {
+ return position_;
+ }
+
+ inline Status SetPosition(int32_t position) {
+ if (position < 0 || position > length_) {
+ return Status::IndexError(fmt::format("position {} index out of
bounds", position));
+ }
+ position_ = position;
+ return Status::OK();
+ }
+
+ inline bool IsReadable() const {
+ return position_ < length_;
+ }
+
+ inline int32_t Available() const {
+ return length_ - position_;
+ }
+
+ inline int8_t ReadByte() {
+ int8_t value;
+ std::memcpy(&value, data_ + position_, 1);
+ position_++;
+ return value;
+ }
+
+ inline int8_t ReadUnsignedByte() {
+ return static_cast<int8_t>(static_cast<uint8_t>(data_[position_++]));
+ }
+
+ inline int32_t ReadInt() {
+ int32_t v;
+ std::memcpy(&v, data_ + position_, sizeof(v));
+ position_ += 4;
+ if (NeedSwap()) {
+ return EndianSwapValue(v);
+ }
+ return v;
+ }
+
+ inline int64_t ReadLong() {
+ int64_t v;
+ std::memcpy(&v, data_ + position_, sizeof(v));
+ position_ += 8;
+ if (NeedSwap()) {
+ return EndianSwapValue(v);
+ }
+ return v;
+ }
+
+ /// Reads a varint32 from the current position.
+ inline Result<int32_t> ReadVarLenInt() {
+ return VarLengthIntUtils::DecodeInt(data_, &position_);
+ }
+
+ /// Reads a varint64 from the current position.
+ inline Result<int64_t> ReadVarLenLong() {
+ return VarLengthIntUtils::DecodeLong(data_, &position_);
+ }
+
+ inline MemorySlice ReadSliceView(int32_t length) {
+ auto view_segment = MemorySegment::WrapView(data_ + position_, length);
+ position_ += length;
+ return MemorySlice(view_segment, 0, length);
+ }
+
+ void SetOrder(ByteOrder order) {
+ byte_order_ = order;
+ }
+
+ private:
+ inline bool NeedSwap() const {
+ return SystemByteOrder() != byte_order_;
+ }
+
+ private:
+ MemorySlice slice_;
+ const char* data_; // Cached raw pointer for fast access.
+ int32_t length_; // Cached length.
+ int32_t position_ = 0;
+
+ ByteOrder byte_order_ = SystemByteOrder();
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_output.cpp
b/src/paimon/common/memory/memory_slice_output.cpp
new file mode 100644
index 0000000..87807ad
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_output.cpp
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/memory/memory_slice_output.h"
+
+#include "fmt/format.h"
+#include "paimon/common/utils/math.h"
+#include "paimon/common/utils/var_length_int_utils.h"
+namespace paimon {
+
+MemorySliceOutput::MemorySliceOutput(int32_t estimated_size, MemoryPool* pool)
{
+ size_ = 0;
+ pool_ = pool;
+ segment_ = MemorySegment::Wrap(Bytes::AllocateBytes(estimated_size, pool));
+}
+
+int32_t MemorySliceOutput::Size() const {
+ return size_;
+}
+
+void MemorySliceOutput::Reset() {
+ size_ = 0;
+}
+
+MemorySlice MemorySliceOutput::ToSlice() {
+ return MemorySlice(segment_, 0, size_);
+}
+
+template <typename T>
+void MemorySliceOutput::WriteValue(T value) {
+ int32_t write_length = sizeof(T);
+ EnsureSize(size_ + write_length);
+ T write_value = value;
+ if (NeedSwap()) {
+ write_value = EndianSwapValue(value);
+ }
+ segment_.PutValue(size_, write_value);
+ size_ += write_length;
+}
+
+Status MemorySliceOutput::WriteVarLenInt(int32_t value) {
+ EnsureSize(size_ + VarLengthIntUtils::kMaxVarIntSize);
+ PAIMON_ASSIGN_OR_RAISE(int32_t bytes_written,
+ VarLengthIntUtils::EncodeInt(value,
segment_.MutableData() + size_));
+ size_ += bytes_written;
+ return Status::OK();
+}
+
+Status MemorySliceOutput::WriteVarLenLong(int64_t value) {
+ EnsureSize(size_ + VarLengthIntUtils::kMaxVarLongSize);
+ PAIMON_ASSIGN_OR_RAISE(int32_t bytes_written,
+ VarLengthIntUtils::EncodeLong(value,
segment_.MutableData() + size_));
+ size_ += bytes_written;
+ return Status::OK();
+}
+
+void MemorySliceOutput::WriteBytes(const std::shared_ptr<Bytes>& source) {
+ WriteBytes(source, 0, source->size());
+}
+
+void MemorySliceOutput::WriteBytes(const std::shared_ptr<Bytes>& source,
int32_t source_index,
+ int32_t length) {
+ EnsureSize(size_ + length);
+ std::string_view sv{source->data(), source->size()};
+ segment_.Put(size_, sv, source_index, length);
+ size_ += length;
+}
+
+void MemorySliceOutput::SetOrder(ByteOrder order) {
+ byte_order_ = order;
+}
+
+bool MemorySliceOutput::NeedSwap() const {
+ return SystemByteOrder() != byte_order_;
+}
+
+void MemorySliceOutput::EnsureSize(int32_t size) {
+ if (size <= segment_.Size()) {
+ return;
+ }
+ int32_t capacity = segment_.Size();
+ int32_t min_capacity = segment_.Size() + size;
+ while (capacity < min_capacity) {
+ // capacity is always a power-of-two and <= INT32_MAX/2 in practice,
+ // so this shift does not overflow.
+ capacity <<= 1;
+ }
+
+ auto bytes = std::make_shared<Bytes>(capacity, pool_);
+ MemorySegment new_segment = MemorySegment::Wrap(bytes);
+
+ segment_.CopyTo(0, &new_segment, 0, segment_.Size());
+ segment_ = new_segment;
+}
+
+template void MemorySliceOutput::WriteValue(bool);
+template void MemorySliceOutput::WriteValue(char);
+template void MemorySliceOutput::WriteValue(int8_t);
+template void MemorySliceOutput::WriteValue(int16_t);
+template void MemorySliceOutput::WriteValue(int32_t);
+template void MemorySliceOutput::WriteValue(int64_t);
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_output.h
b/src/paimon/common/memory/memory_slice_output.h
new file mode 100644
index 0000000..cbfca98
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_output.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class MemoryPool;
+
+/// Slice of a MemorySegment.
+class PAIMON_EXPORT MemorySliceOutput {
+ public:
+ MemorySliceOutput() = default;
+
+ MemorySliceOutput(int32_t estimated_size, MemoryPool* pool);
+
+ int32_t Size() const;
+ void Reset();
+ MemorySlice ToSlice();
+
+ template <typename T>
+ void WriteValue(T value);
+
+ Status WriteVarLenInt(int32_t value);
+ Status WriteVarLenLong(int64_t value);
+
+ void WriteBytes(const std::shared_ptr<Bytes>& source);
+ void WriteBytes(const std::shared_ptr<Bytes>& source, int32_t
source_index, int32_t length);
+
+ void SetOrder(ByteOrder order);
+
+ private:
+ void EnsureSize(int32_t bytes);
+ bool NeedSwap() const;
+
+ private:
+ MemoryPool* pool_;
+ MemorySegment segment_;
+ int32_t size_;
+
+ ByteOrder byte_order_ = SystemByteOrder();
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_slice_test.cpp
b/src/paimon/common/memory/memory_slice_test.cpp
new file mode 100644
index 0000000..2d550f4
--- /dev/null
+++ b/src/paimon/common/memory/memory_slice_test.cpp
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/memory/memory_slice.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/memory/memory_slice_input.h"
+#include "paimon/common/memory/memory_slice_output.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class MemorySliceTest : public ::testing::Test {
+ protected:
+ std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
+};
+
+// ==================== MemorySliceOutput Tests ====================
+
+TEST_F(MemorySliceTest, TestOutputWriteValueAndToSlice) {
+ MemorySliceOutput output(64, pool_.get());
+ ASSERT_EQ(0, output.Size());
+
+ output.WriteValue<int32_t>(42);
+ output.WriteValue<int64_t>(123456789LL);
+ output.WriteValue<int16_t>(static_cast<int16_t>(7));
+ output.WriteValue<int8_t>(static_cast<int8_t>(-1));
+ output.WriteValue<bool>(true);
+
+ // 4 + 8 + 2 + 1 + 1 = 16
+ ASSERT_EQ(16, output.Size());
+
+ MemorySlice slice = output.ToSlice();
+ ASSERT_EQ(16, slice.Length());
+ ASSERT_EQ(0, slice.Offset());
+
+ ASSERT_EQ(42, slice.ReadInt(0));
+ ASSERT_EQ(123456789LL, slice.ReadLong(4));
+ ASSERT_EQ(7, slice.ReadShort(12));
+ ASSERT_EQ(-1, slice.ReadByte(14));
+}
+
+TEST_F(MemorySliceTest, TestOutputWriteBytes) {
+ MemorySliceOutput output(16, pool_.get());
+
+ std::string data = "hello world";
+ auto bytes = std::make_shared<Bytes>(data, pool_.get());
+ output.WriteBytes(bytes);
+ ASSERT_EQ(static_cast<int32_t>(data.size()), output.Size());
+
+ MemorySlice slice = output.ToSlice();
+ ASSERT_EQ("hello world", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestOutputWriteBytesSubRange) {
+ MemorySliceOutput output(16, pool_.get());
+
+ std::string data = "hello world";
+ auto bytes = std::make_shared<Bytes>(data, pool_.get());
+ output.WriteBytes(bytes, 6, 5);
+ ASSERT_EQ(5, output.Size());
+
+ MemorySlice slice = output.ToSlice();
+ ASSERT_EQ("world", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestOutputVarLenInt) {
+ MemorySliceOutput output(32, pool_.get());
+
+ ASSERT_OK(output.WriteVarLenInt(0));
+ ASSERT_OK(output.WriteVarLenInt(127));
+ ASSERT_OK(output.WriteVarLenInt(128));
+ ASSERT_OK(output.WriteVarLenInt(16384));
+
+ MemorySlice slice = output.ToSlice();
+ MemorySliceInput input(slice);
+
+ ASSERT_OK_AND_ASSIGN(int32_t v0, input.ReadVarLenInt());
+ ASSERT_EQ(0, v0);
+ ASSERT_OK_AND_ASSIGN(int32_t v127, input.ReadVarLenInt());
+ ASSERT_EQ(127, v127);
+ ASSERT_OK_AND_ASSIGN(int32_t v128, input.ReadVarLenInt());
+ ASSERT_EQ(128, v128);
+ ASSERT_OK_AND_ASSIGN(int32_t v16384, input.ReadVarLenInt());
+ ASSERT_EQ(16384, v16384);
+}
+
+TEST_F(MemorySliceTest, TestOutputVarLenLong) {
+ MemorySliceOutput output(32, pool_.get());
+
+ ASSERT_OK(output.WriteVarLenLong(0));
+ ASSERT_OK(output.WriteVarLenLong(127));
+ ASSERT_OK(output.WriteVarLenLong(128));
+ ASSERT_OK(output.WriteVarLenLong(1234567890123LL));
+
+ MemorySlice slice = output.ToSlice();
+ MemorySliceInput input(slice);
+
+ ASSERT_OK_AND_ASSIGN(int64_t v0, input.ReadVarLenLong());
+ ASSERT_EQ(0, v0);
+ ASSERT_OK_AND_ASSIGN(int64_t v127, input.ReadVarLenLong());
+ ASSERT_EQ(127, v127);
+ ASSERT_OK_AND_ASSIGN(int64_t v128, input.ReadVarLenLong());
+ ASSERT_EQ(128, v128);
+ ASSERT_OK_AND_ASSIGN(int64_t v_large, input.ReadVarLenLong());
+ ASSERT_EQ(1234567890123LL, v_large);
+}
+
+TEST_F(MemorySliceTest, TestOutputVarLenNegativeValue) {
+ MemorySliceOutput output(16, pool_.get());
+
+ ASSERT_NOK_WITH_MSG(output.WriteVarLenInt(-1), "negative value: v=-1");
+ ASSERT_NOK_WITH_MSG(output.WriteVarLenLong(-2), "negative value: v=-2");
+ ASSERT_EQ(0, output.Size());
+}
+
+TEST_F(MemorySliceTest, TestOutputReset) {
+ MemorySliceOutput output(16, pool_.get());
+ output.WriteValue<int32_t>(42);
+ ASSERT_EQ(4, output.Size());
+
+ output.Reset();
+ ASSERT_EQ(0, output.Size());
+
+ output.WriteValue<int64_t>(99);
+ ASSERT_EQ(8, output.Size());
+ MemorySlice slice = output.ToSlice();
+ ASSERT_EQ(99, slice.ReadLong(0));
+}
+
+TEST_F(MemorySliceTest, TestOutputAutoGrow) {
+ // Start with tiny buffer, force multiple grows
+ MemorySliceOutput output(4, pool_.get());
+ for (int32_t i = 0; i < 100; i++) {
+ output.WriteValue<int32_t>(i);
+ }
+ ASSERT_EQ(400, output.Size());
+
+ MemorySlice slice = output.ToSlice();
+ for (int32_t i = 0; i < 100; i++) {
+ ASSERT_EQ(i, slice.ReadInt(i * 4));
+ }
+}
+
+TEST_F(MemorySliceTest, TestOutputByteOrder) {
+ {
+ MemorySliceOutput output(16, pool_.get());
+ output.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+ output.WriteValue<int32_t>(0x01020304);
+ MemorySlice slice = output.ToSlice();
+
+ MemorySliceInput input(slice);
+ input.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+ ASSERT_EQ(0x01020304, input.ReadInt());
+ }
+ {
+ MemorySliceOutput output(16, pool_.get());
+ output.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ output.WriteValue<int32_t>(0x01020304);
+ MemorySlice slice = output.ToSlice();
+
+ MemorySliceInput input(slice);
+ input.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ ASSERT_EQ(0x01020304, input.ReadInt());
+ }
+ {
+ MemorySliceOutput output(16, pool_.get());
+ output.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+ output.WriteValue<int64_t>(123456789123456ll);
+ MemorySlice slice = output.ToSlice();
+
+ MemorySliceInput input(slice);
+ input.SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+ ASSERT_EQ(123456789123456ll, input.ReadLong());
+ }
+ {
+ MemorySliceOutput output(16, pool_.get());
+ output.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ output.WriteValue<int64_t>(123456789123456ll);
+ MemorySlice slice = output.ToSlice();
+
+ MemorySliceInput input(slice);
+ input.SetOrder(ByteOrder::PAIMON_LITTLE_ENDIAN);
+ ASSERT_EQ(123456789123456ll, input.ReadLong());
+ }
+}
+
+// ==================== MemorySlice Tests ====================
+
+TEST_F(MemorySliceTest, TestSliceWrapBytes) {
+ std::string data = "abcdefghij";
+ auto bytes = std::make_shared<Bytes>(data, pool_.get());
+ MemorySlice slice = MemorySlice::Wrap(bytes);
+
+ ASSERT_EQ(10, slice.Length());
+ ASSERT_EQ(0, slice.Offset());
+ ASSERT_EQ("abcdefghij", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestSliceWrapSegment) {
+ auto bytes = std::make_shared<Bytes>("hello", pool_.get());
+ MemorySegment segment = MemorySegment::Wrap(bytes);
+ MemorySlice slice = MemorySlice::Wrap(segment);
+
+ ASSERT_EQ(5, slice.Length());
+ ASSERT_EQ("hello", slice.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestSliceSubSlice) {
+ std::string data = "abcdefghij";
+ auto bytes = std::make_shared<Bytes>(data, pool_.get());
+ MemorySlice slice = MemorySlice::Wrap(bytes);
+
+ MemorySlice sub = slice.Slice(3, 4);
+ ASSERT_EQ(4, sub.Length());
+ ASSERT_EQ(3, sub.Offset());
+ ASSERT_EQ("defg", sub.ReadStringView());
+
+ // Slice with same range returns equivalent slice
+ MemorySlice same = slice.Slice(0, 10);
+ ASSERT_EQ(10, same.Length());
+ ASSERT_EQ("abcdefghij", same.ReadStringView());
+}
+
+TEST_F(MemorySliceTest, TestSliceReadPrimitives) {
+ MemorySliceOutput output(32, pool_.get());
+ output.WriteValue<int8_t>(static_cast<int8_t>(0x7F));
+ output.WriteValue<int16_t>(static_cast<int16_t>(1234));
+ output.WriteValue<int32_t>(56789);
+ output.WriteValue<int64_t>(9876543210LL);
+
+ MemorySlice slice = output.ToSlice();
+ ASSERT_EQ(0x7F, slice.ReadByte(0));
+ ASSERT_EQ(static_cast<int16_t>(1234), slice.ReadShort(1));
+ ASSERT_EQ(56789, slice.ReadInt(3));
+ ASSERT_EQ(9876543210LL, slice.ReadLong(7));
+}
+
+TEST_F(MemorySliceTest, TestSliceCopyBytes) {
+ std::string data = "copy me";
+ auto bytes = std::make_shared<Bytes>(data, pool_.get());
+
+ MemorySlice slice = MemorySlice::Wrap(bytes);
+ auto copied = slice.CopyBytes(pool_.get());
+ ASSERT_EQ(data.size(), copied->size());
+ ASSERT_EQ(data, std::string(copied->data(), copied->size()));
+ // Verify it's a true copy (different pointer)
+ ASSERT_NE(bytes->data(), copied->data());
+
+ MemorySlice slice2(MemorySegment::Wrap(bytes), /*offset=*/5, /*length=*/2);
+ copied = slice2.CopyBytes(pool_.get());
+ ASSERT_EQ(2, copied->size());
+ ASSERT_EQ("me", std::string(copied->data(), copied->size()));
+ ASSERT_NE(bytes->data(), copied->data());
+}
+
+TEST_F(MemorySliceTest, TestSliceGetOrCreateHeapMemory) {
+ auto bytes = std::make_shared<Bytes>("test", pool_.get());
+ MemorySlice slice = MemorySlice::Wrap(bytes);
+ ASSERT_EQ(bytes, slice.GetOrCreateHeapMemory(pool_.get()));
+}
+
+TEST_F(MemorySliceTest, TestSliceToInput) {
+ MemorySliceOutput output(16, pool_.get());
+ output.WriteValue<int32_t>(42);
+ output.WriteValue<int64_t>(99);
+
+ MemorySlice slice = output.ToSlice();
+ MemorySliceInput input = slice.ToInput();
+
+ ASSERT_EQ(42, input.ReadInt());
+ ASSERT_EQ(99, input.ReadLong());
+}
+
+// ==================== MemorySliceInput Tests ====================
+
+TEST_F(MemorySliceTest, TestInputPositionAndAvailable) {
+ MemorySliceOutput output(16, pool_.get());
+ output.WriteValue<int32_t>(1);
+ output.WriteValue<int32_t>(2);
+
+ MemorySliceInput input(output.ToSlice());
+ ASSERT_EQ(0, input.Position());
+ ASSERT_EQ(8, input.Available());
+ ASSERT_TRUE(input.IsReadable());
+
+ input.ReadInt();
+ ASSERT_EQ(4, input.Position());
+ ASSERT_EQ(4, input.Available());
+ ASSERT_TRUE(input.IsReadable());
+
+ input.ReadInt();
+ ASSERT_EQ(8, input.Position());
+ ASSERT_EQ(0, input.Available());
+ ASSERT_FALSE(input.IsReadable());
+}
+
+TEST_F(MemorySliceTest, TestInputSetPosition) {
+ MemorySliceOutput output(16, pool_.get());
+ output.WriteValue<int32_t>(111);
+ output.WriteValue<int32_t>(222);
+
+ MemorySliceInput input(output.ToSlice());
+ input.ReadInt();
+ ASSERT_EQ(4, input.Position());
+
+ ASSERT_OK(input.SetPosition(0));
+ ASSERT_EQ(0, input.Position());
+ ASSERT_EQ(111, input.ReadInt());
+
+ // Invalid positions
+ ASSERT_NOK_WITH_MSG(input.SetPosition(-1), "position -1 index out of
bounds");
+ ASSERT_NOK_WITH_MSG(input.SetPosition(100), "position 100 index out of
bounds");
+}
+
+TEST_F(MemorySliceTest, TestInputReadByte) {
+ MemorySliceOutput output(8, pool_.get());
+ output.WriteValue<int8_t>(static_cast<int8_t>(-128));
+ output.WriteValue<int8_t>(static_cast<int8_t>(127));
+ output.WriteValue<int8_t>(static_cast<int8_t>(0));
+
+ MemorySliceInput input(output.ToSlice());
+ ASSERT_EQ(-128, input.ReadByte());
+ ASSERT_EQ(127, input.ReadByte());
+ ASSERT_EQ(0, input.ReadByte());
+}
+
+TEST_F(MemorySliceTest, TestInputReadUnsignedByte) {
+ MemorySliceOutput output(8, pool_.get());
+ output.WriteValue<int8_t>(static_cast<int8_t>(0xFF));
+
+ MemorySliceInput input(output.ToSlice());
+ // ReadUnsignedByte masks with 0xFF
+ int8_t value = input.ReadUnsignedByte();
+ ASSERT_EQ(static_cast<int8_t>(0xFF & 0xFF), value);
+}
+
+TEST_F(MemorySliceTest, TestInputReadIntAndLong) {
+ MemorySliceOutput output(32, pool_.get());
+ output.WriteValue<int32_t>(0);
+ output.WriteValue<int32_t>(INT32_MAX);
+ output.WriteValue<int32_t>(INT32_MIN);
+ output.WriteValue<int64_t>(0);
+ output.WriteValue<int64_t>(INT64_MAX);
+ output.WriteValue<int64_t>(INT64_MIN);
+
+ MemorySliceInput input(output.ToSlice());
+ ASSERT_EQ(0, input.ReadInt());
+ ASSERT_EQ(INT32_MAX, input.ReadInt());
+ ASSERT_EQ(INT32_MIN, input.ReadInt());
+ ASSERT_EQ(0, input.ReadLong());
+ ASSERT_EQ(INT64_MAX, input.ReadLong());
+ ASSERT_EQ(INT64_MIN, input.ReadLong());
+}
+
+TEST_F(MemorySliceTest, TestInputReadSliceView) {
+ std::string data = "abcdefghij";
+ auto bytes = std::make_shared<Bytes>(data, pool_.get());
+ MemorySlice slice = MemorySlice::Wrap(bytes);
+ MemorySliceInput input(slice);
+
+ MemorySlice sub = input.ReadSliceView(5);
+ ASSERT_EQ("abcde", sub.ReadStringView());
+ ASSERT_EQ(5, input.Position());
+
+ MemorySlice sub2 = input.ReadSliceView(5);
+ ASSERT_EQ("fghij", sub2.ReadStringView());
+ ASSERT_EQ(10, input.Position());
+}
+
+// ==================== Round-trip Output → Input Tests ====================
+
+TEST_F(MemorySliceTest, TestRoundTripMixedTypes) {
+ MemorySliceOutput output(64, pool_.get());
+ output.WriteValue<int8_t>(static_cast<int8_t>(42));
+ output.WriteValue<int16_t>(static_cast<int16_t>(1000));
+ output.WriteValue<int32_t>(123456);
+ output.WriteValue<int64_t>(9876543210LL);
+ ASSERT_OK(output.WriteVarLenInt(300));
+ ASSERT_OK(output.WriteVarLenLong(1000000LL));
+
+ auto bytes = std::make_shared<Bytes>("test", pool_.get());
+ output.WriteValue<int32_t>(static_cast<int32_t>(bytes->size()));
+ output.WriteBytes(bytes);
+
+ MemorySlice slice = output.ToSlice();
+ MemorySliceInput input(slice);
+ ASSERT_EQ(42, input.ReadByte());
+ // int16_t written at position 1, read via slice
+ ASSERT_EQ(1000, slice.ReadShort(1));
+ ASSERT_OK(input.SetPosition(3));
+ ASSERT_EQ(123456, input.ReadInt());
+ ASSERT_OK(input.SetPosition(3));
+ ASSERT_EQ(123456, input.ReadInt());
+ ASSERT_EQ(9876543210LL, input.ReadLong());
+
+ ASSERT_OK_AND_ASSIGN(int32_t var_int, input.ReadVarLenInt());
+ ASSERT_EQ(300, var_int);
+ ASSERT_OK_AND_ASSIGN(int64_t var_long, input.ReadVarLenLong());
+ ASSERT_EQ(1000000LL, var_long);
+
+ int32_t str_len = input.ReadInt();
+ ASSERT_EQ(4, str_len);
+ MemorySlice str_slice = input.ReadSliceView(str_len);
+ ASSERT_EQ("test", str_slice.ReadStringView());
+}
+
+} // namespace paimon::test