This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 534b67d  feat: introduce common/memory module (#11)
534b67d is described below

commit 534b67ddea2de969098d58aee5d6ad89e2998ac6
Author: lxy <[email protected]>
AuthorDate: Tue May 26 13:38:20 2026 +0800

    feat: introduce common/memory module (#11)
---
 include/paimon/io/byte_order.h                     |  57 ++
 src/paimon/common/memory/memory_segment.cpp        |  84 +++
 src/paimon/common/memory/memory_segment.h          | 211 ++++++
 src/paimon/common/memory/memory_segment_test.cpp   | 760 +++++++++++++++++++++
 src/paimon/common/memory/memory_segment_utils.cpp  | 334 +++++++++
 src/paimon/common/memory/memory_segment_utils.h    | 480 +++++++++++++
 .../common/memory/memory_segment_utils_test.cpp    | 295 ++++++++
 7 files changed, 2221 insertions(+)

diff --git a/include/paimon/io/byte_order.h b/include/paimon/io/byte_order.h
new file mode 100644
index 0000000..299244c
--- /dev/null
+++ b/include/paimon/io/byte_order.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+namespace paimon {
+
+#if defined(__s390x__)
+#define PAIMON_LITTLEENDIAN 0
+#endif  // __s390x__
+#if !defined(PAIMON_LITTLEENDIAN)
+#if defined(__GNUC__) || defined(__clang__) || defined(__ICCARM__)
+#if (defined(__BIG_ENDIAN__) || (defined(__BYTE_ORDER__) && __BYTE_ORDER__ == 
__ORDER_BIG_ENDIAN__))
+#define PAIMON_LITTLEENDIAN 0
+#else
+#define PAIMON_LITTLEENDIAN 1
+#endif  // __BIG_ENDIAN__
+#elif defined(_MSC_VER)
+#if defined(_M_PPC)
+#define PAIMON_LITTLEENDIAN 0
+#else
+#define PAIMON_LITTLEENDIAN 1
+#endif
+#else
+#error Unable to determine endianness, define PAIMON_LITTLEENDIAN.
+#endif
+#endif  // !defined(PAIMON_LITTLEENDIAN)
+
+enum class ByteOrder : int8_t { PAIMON_BIG_ENDIAN = 1, PAIMON_LITTLE_ENDIAN = 
2 };
+
+/// Get the byte order of the system.
+constexpr ByteOrder SystemByteOrder() {
+    if (PAIMON_LITTLEENDIAN) {
+        return ByteOrder::PAIMON_LITTLE_ENDIAN;
+    } else {
+        return ByteOrder::PAIMON_BIG_ENDIAN;
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment.cpp 
b/src/paimon/common/memory/memory_segment.cpp
new file mode 100644
index 0000000..be4a8d5
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment.h"
+
+#include <algorithm>
+
+namespace paimon {
+
+int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1, 
int32_t offset2,
+                               int32_t len) const {
+    while (len >= 8) {
+        uint64_t l1 = GetLongBigEndian(offset1);
+        uint64_t l2 = seg2.GetLongBigEndian(offset2);
+
+        if (l1 != l2) {
+            return (l1 < l2) ? -1 : 1;
+        }
+
+        offset1 += 8;
+        offset2 += 8;
+        len -= 8;
+    }
+    while (len > 0) {
+        int32_t b1 = Get(offset1) & 0xff;
+        int32_t b2 = seg2.Get(offset2) & 0xff;
+        int32_t cmp = b1 - b2;
+        if (cmp != 0) {
+            return cmp;
+        }
+        offset1++;
+        offset2++;
+        len--;
+    }
+    return 0;
+}
+
+int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1, 
int32_t offset2,
+                               int32_t len1, int32_t len2) const {
+    const int32_t min_length = std::min(len1, len2);
+    int32_t c = Compare(seg2, offset1, offset2, min_length);
+    return c == 0 ? (len1 - len2) : c;
+}
+
+bool MemorySegment::EqualTo(const MemorySegment& seg2, int32_t offset1, 
int32_t offset2,
+                            int32_t length) const {
+    int32_t i = 0;
+    // we assume unaligned accesses are supported.
+    // Compare 8 bytes at a time.
+    while (i <= length - 8) {
+        if (GetValue<int64_t>(offset1 + i) != seg2.GetValue<int64_t>(offset2 + 
i)) {
+            return false;
+        }
+        i += 8;
+    }
+
+    // cover the last (length % 8) elements.
+    while (i < length) {
+        if (Get(offset1 + i) != seg2.Get(offset2 + i)) {
+            return false;
+        }
+        i += 1;
+    }
+
+    return true;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment.h 
b/src/paimon/common/memory/memory_segment.h
new file mode 100644
index 0000000..a98e5d8
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment.h
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "paimon/common/utils/math.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class MemoryPool;
+
+/// This class represents a piece of memory.
+///
+/// Supports two modes:
+/// - Owning mode: holds a shared_ptr<Bytes> for lifetime management.
+/// - Non-owning (view) mode: holds a raw pointer to external data.
+///   The caller must ensure the underlying memory outlives this segment.
+class PAIMON_EXPORT MemorySegment {
+ public:
+    MemorySegment() : data_(nullptr), size_(0) {}
+
+    /// Wrap a shared_ptr<Bytes> to create an owning segment.
+    static MemorySegment Wrap(const std::shared_ptr<Bytes>& buffer) {
+        return MemorySegment(buffer);
+    }
+
+    /// Create a non-owning segment that references external memory.
+    /// The caller must guarantee that `data` remains valid for the lifetime 
of this segment.
+    static MemorySegment WrapView(const char* data, int32_t size) {
+        return MemorySegment(data, size);
+    }
+
+    static MemorySegment AllocateHeapMemory(int32_t size, MemoryPool* pool) {
+        assert(pool);
+        return Wrap(Bytes::AllocateBytes(size, pool));
+    }
+
+    MemorySegment(const MemorySegment& other) = default;
+    MemorySegment& operator=(const MemorySegment& other) = default;
+
+    bool operator==(const MemorySegment& other) const {
+        if (this == &other) {
+            return true;
+        }
+        if (data_ == other.data_ && size_ == other.size_) {
+            return true;
+        }
+        if (!data_ || !other.data_) {
+            return false;
+        }
+        if (size_ != other.size_) {
+            return false;
+        }
+        return std::memcmp(data_, other.data_, size_) == 0;
+    }
+
+    inline int32_t Size() const {
+        return size_;
+    }
+
+    /// Returns the raw data pointer (valid for both owning and non-owning 
segments).
+    inline const char* Data() const {
+        return data_;
+    }
+
+    inline char Get(int32_t index) const {
+        return *(data_ + index);
+    }
+
+    /// Returns a mutable pointer to the data. Use with caution on non-owning 
segments.
+    inline char* MutableData() {
+        return const_cast<char*>(data_);
+    }
+
+    inline void Put(int32_t index, char b) {
+        MutableData()[index] = b;
+    }
+
+    inline void Get(int32_t index, Bytes* dst) const {
+        return Get(index, dst, /*offset=*/0, dst->size());
+    }
+
+    inline void Put(int32_t index, const Bytes& src) {
+        return Put(index, src, /*offset=*/0, src.size());
+    }
+
+    template <typename T>
+    inline void Get(int32_t index, T* dst, int32_t offset, int32_t length) 
const {
+        assert(static_cast<int32_t>(dst->size()) >= (offset + length));
+        assert(size_ >= (index + length));
+        std::memcpy(const_cast<char*>(dst->data()) + offset, data_ + index, 
length);
+    }
+
+    template <typename T>
+    inline void Put(int32_t index, const T& src, int32_t offset, int32_t 
length) {
+        assert(static_cast<int32_t>(src.size()) >= (offset + length));
+        assert(size_ >= (index + length));
+        std::memcpy(MutableData() + index, src.data() + offset, length);
+    }
+
+    template <typename T>
+    T GetValue(int32_t index) const {
+        static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+        T value;
+        std::memcpy(&value, data_ + index, sizeof(T));
+        return value;
+    }
+
+    template <typename T>
+    void PutValue(int32_t index, const T& value) {
+        static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+
+        std::memcpy(MutableData() + index, &value, sizeof(T));
+    }
+
+    inline uint64_t GetLongBigEndian(int32_t index) const {
+        auto value = GetValue<uint64_t>(index);
+        if constexpr (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+            return EndianSwapValue(value);
+        }
+        return value;
+    }
+
+    void CopyTo(int32_t offset, MemorySegment* target, int32_t target_offset,
+                int32_t num_bytes) const {
+        assert(offset >= 0);
+        assert(target_offset >= 0);
+        assert(num_bytes >= 0);
+
+        std::memcpy(target->MutableData() + target_offset, data_ + offset, 
num_bytes);
+    }
+
+    void CopyToUnsafe(int32_t offset, void* target, int32_t target_offset,
+                      int32_t num_bytes) const {
+        std::memcpy(static_cast<char*>(target) + target_offset, data_ + 
offset, num_bytes);
+    }
+
+    int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t 
offset2, int32_t len) const;
+
+    int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t 
offset2, int32_t len1,
+                    int32_t len2) const;
+
+    bool EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2, 
int32_t length) const;
+
+    std::shared_ptr<Bytes> GetOrCreateHeapMemory(MemoryPool* pool) const {
+        if (heap_memory_) {
+            return heap_memory_;
+        }
+        if (!data_) {
+            return nullptr;
+        }
+        auto copy = std::make_shared<Bytes>(size_, pool);
+        std::memcpy(const_cast<char*>(copy->data()), data_, size_);
+        return copy;
+    }
+
+ private:
+    /// Owning constructor.
+    explicit MemorySegment(const std::shared_ptr<Bytes>& heap_memory)
+        : heap_memory_(heap_memory),
+          data_(heap_memory->data()),
+          size_(static_cast<int32_t>(heap_memory->size())) {
+        assert(heap_memory_);
+    }
+
+    /// Non-owning constructor.
+    MemorySegment(const char* data, int32_t size)
+        : heap_memory_(nullptr), data_(data), size_(size) {
+        assert(data != nullptr || size == 0);
+    }
+
+    std::shared_ptr<Bytes> heap_memory_;
+    const char* data_;
+    int32_t size_;
+};
+
+template <>
+inline bool MemorySegment::GetValue(int32_t index) const {
+    return Get(index) != 0;
+}
+
+template <>
+inline void MemorySegment::PutValue(int32_t index, const bool& value) {
+    Put(index, static_cast<char>(value ? 1 : 0));
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment_test.cpp 
b/src/paimon/common/memory/memory_segment_test.cpp
new file mode 100644
index 0000000..0501295
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_test.cpp
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment.h"
+
+#include <climits>
+#include <cstdlib>
+#include <limits>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(MemorySegmentTest, TestByteAccess) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+    for (int32_t i = 0; i < page_size; i++) {
+        segment.Put(i, static_cast<char>(std::rand()));
+    }
+    std::srand(seed);
+    for (int32_t i = 0; i < page_size; i++) {
+        ASSERT_EQ(segment.Get(i), static_cast<char>(std::rand()))
+            << "seed: " << seed << ", idx: " << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % page_size;
+        if (occupied[pos]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+        }
+        segment.Put(pos, static_cast<char>(std::rand()));
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % page_size;
+
+        if (occupied[pos]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+        }
+
+        ASSERT_EQ(segment.Get(pos), static_cast<char>(std::rand()))
+            << "seed: " << seed << ", idx: " << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestBooleanAccess) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % page_size;
+        if (occupied[pos]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+        }
+        segment.PutValue<bool>(pos, static_cast<bool>(std::rand() % 2));
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % page_size;
+        if (occupied[pos]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<bool>(pos), static_cast<bool>(std::rand() % 
2))
+            << "seed: " << seed << ", idx: " << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestEqualTo) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment seg1 = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+    MemorySegment seg2 = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    Bytes reference_array(page_size, pool.get());
+    seg1.Put(0, reference_array);
+    seg2.Put(0, reference_array);
+
+    int32_t i = paimon::test::RandomNumber(0, (page_size - 8) - 1);
+    seg1.Put(i, static_cast<char>(10));
+    ASSERT_FALSE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i;
+
+    seg1.Put(i, static_cast<char>(0));
+    ASSERT_TRUE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i;
+
+    seg1.Put(i + 8, static_cast<char>(10));
+    ASSERT_FALSE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i;
+}
+
+TEST(MemorySegmentTest, TestCompare) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment seg1 = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+    MemorySegment seg2 = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    Bytes reference_array(page_size, pool.get());
+    seg1.Put(0, reference_array);
+    seg2.Put(0, reference_array);
+
+    int32_t i = paimon::test::RandomNumber(0, (page_size - 8) - 1);
+    seg1.Put(i, static_cast<char>(10));
+    ASSERT_GT(seg1.Compare(seg2, i, i, 9, 9), 0);
+
+    seg1.Put(i, static_cast<char>(0));
+    ASSERT_EQ(seg1.Compare(seg2, i, i, 9, 9), 0);
+
+    seg2.Put(i + 8, static_cast<char>(10));
+    ASSERT_EQ(seg1.Compare(seg2, i, i, 7, 7), 0);
+    ASSERT_LT(seg1.Compare(seg2, i, i, 9, 9), 0);
+
+    // Verify big-endian byte-order comparison semantics within a single 
8-byte block.
+    // On little-endian machines, naive native-endian uint64 comparison would 
give wrong results.
+    MemorySegment seg3 = MemorySegment::AllocateHeapMemory(16, pool.get());
+    MemorySegment seg4 = MemorySegment::AllocateHeapMemory(16, pool.get());
+    Bytes zeros(16, pool.get());
+    seg3.Put(0, zeros);
+    seg4.Put(0, zeros);
+
+    // seg3: [0x00, 0x01, 0, 0, 0, 0, 0, 0] at offset 0
+    // seg4: [0x01, 0x00, 0, 0, 0, 0, 0, 0] at offset 0
+    // Lexicographic (byte-order) comparison: first byte 0x00 < 0x01, so seg3 
< seg4.
+    seg3.Put(1, static_cast<char>(0x01));
+    seg4.Put(0, static_cast<char>(0x01));
+    ASSERT_LT(seg3.Compare(seg4, 0, 0, 8), 0);
+    ASSERT_GT(seg4.Compare(seg3, 0, 0, 8), 0);
+
+    // seg3: [0x01, 0x02, 0, 0, 0, 0, 0, 0]
+    // seg4: [0x01, 0x01, 0, 0, 0, 0, 0, 0]
+    // First bytes equal (0x01 == 0x01), second byte 0x02 > 0x01, so seg3 > 
seg4.
+    seg3.Put(0, static_cast<char>(0x01));
+    seg3.Put(1, static_cast<char>(0x02));
+    seg4.Put(0, static_cast<char>(0x01));
+    seg4.Put(1, static_cast<char>(0x01));
+    ASSERT_GT(seg3.Compare(seg4, 0, 0, 8), 0);
+    ASSERT_LT(seg4.Compare(seg3, 0, 0, 8), 0);
+}
+
+TEST(MemorySegmentTest, TestCharAccess) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+
+    for (int32_t i = 0; i <= page_size - 2; i += 2) {
+        segment.PutValue<char16_t>(i, static_cast<char>(std::rand() % 
(CHAR_MAX)));
+    }
+
+    std::srand(seed);
+    for (int32_t i = 0; i <= page_size - 2; i += 2) {
+        ASSERT_EQ(segment.GetValue<char16_t>(i), static_cast<char>(std::rand() 
% (CHAR_MAX)))
+            << "seed: " << seed << ", idx: " << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 1);
+        if (occupied[pos] || occupied[pos + 1]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+        }
+        segment.PutValue<char16_t>(pos, static_cast<char>(std::rand() % 
(CHAR_MAX)));
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 1);
+        if (occupied[pos] || occupied[pos + 1]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<char16_t>(pos), 
static_cast<char>(std::rand() % (CHAR_MAX)))
+            << "seed: " << seed << ", idx:" << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestShortAccess) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+
+    for (int32_t i = 0; i <= page_size - 2; i += 2) {
+        segment.PutValue<int16_t>(i, static_cast<int16_t>(std::rand()));
+    }
+
+    std::srand(seed);
+    for (int32_t i = 0; i <= page_size - 2; i += 2) {
+        ASSERT_EQ(segment.GetValue<int16_t>(i), 
static_cast<int16_t>(std::rand()))
+            << "seed: " << seed << ", idx:" << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 1);
+        if (occupied[pos] || occupied[pos + 1]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+        }
+        segment.PutValue<int16_t>(pos, static_cast<int16_t>(std::rand()));
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 1);
+        if (occupied[pos] || occupied[pos + 1]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<int16_t>(pos), 
static_cast<int16_t>(std::rand()))
+            << "seed: " << seed << ", idx:" << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestIntAccess) {
+    auto pool = paimon::GetDefaultPool();
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+
+    for (int32_t i = 0; i <= page_size - 4; i += 4) {
+        segment.PutValue<int32_t>(i, static_cast<int32_t>(std::rand()));
+    }
+
+    std::srand(seed);
+    for (int32_t i = 0; i <= page_size - 4; i += 4) {
+        ASSERT_EQ(segment.GetValue<int32_t>(i), 
static_cast<int32_t>(std::rand()))
+            << "seed: " << seed << ", idx:" << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 3);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+        }
+        segment.PutValue<int32_t>(pos, static_cast<int32_t>(std::rand()));
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 3);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<int32_t>(pos), 
static_cast<int32_t>(std::rand()))
+            << "seed: " << seed << ", idx:" << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestLongAccess) {
+    auto pool = paimon::GetDefaultPool();
+    auto lrand = []() -> int64_t {
+        return (static_cast<int64_t>(std::rand()) << (sizeof(int32_t) * 8)) | 
std::rand();
+    };
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+
+    for (int32_t i = 0; i <= page_size - 8; i += 8) {
+        segment.PutValue<int64_t>(i, lrand());
+    }
+
+    std::srand(seed);
+    for (int32_t i = 0; i <= page_size - 8; i += 8) {
+        ASSERT_EQ(segment.GetValue<int64_t>(i), lrand()) << "seed: " << seed 
<< ", idx:" << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 7);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3] ||
+            occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || 
occupied[pos + 7]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+            occupied[pos + 4] = true;
+            occupied[pos + 5] = true;
+            occupied[pos + 6] = true;
+            occupied[pos + 7] = true;
+        }
+        segment.PutValue<int64_t>(pos, lrand());
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 7);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3] ||
+            occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || 
occupied[pos + 7]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+            occupied[pos + 4] = true;
+            occupied[pos + 5] = true;
+            occupied[pos + 6] = true;
+            occupied[pos + 7] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<int64_t>(pos), lrand()) << "seed: " << seed 
<< ", idx:" << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestFloatAccess) {
+    auto pool = paimon::GetDefaultPool();
+    auto frand = []() -> int64_t {
+        return (static_cast<float>(std::rand()) / 
static_cast<float>(RAND_MAX));
+    };
+
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+
+    for (int32_t i = 0; i <= page_size - 4; i += 4) {
+        segment.PutValue<float>(i, frand());
+    }
+
+    std::srand(seed);
+    for (int32_t i = 0; i <= page_size - 4; i += 4) {
+        ASSERT_EQ(segment.GetValue<float>(i), frand()) << "seed: " << seed << 
", idx:" << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 3);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+        }
+        segment.PutValue<float>(pos, frand());
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 3);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<float>(pos), frand()) << "seed: " << seed 
<< ", idx:" << pos;
+    }
+    delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestDoubleAccess) {
+    auto pool = paimon::GetDefaultPool();
+    auto lrand = []() -> int64_t {
+        return (static_cast<int64_t>(std::rand()) << (sizeof(int32_t) * 8)) | 
std::rand();
+    };
+    auto drand = [&]() -> int64_t {
+        return (static_cast<double>(lrand()) /
+                static_cast<double>(std::numeric_limits<int64_t>::max()));
+    };
+
+    int32_t page_size = 64 * 1024;
+    MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+
+    for (int32_t i = 0; i <= page_size - 8; i += 8) {
+        segment.PutValue<double>(i, drand());
+    }
+
+    std::srand(seed);
+    for (int32_t i = 0; i <= page_size - 8; i += 8) {
+        ASSERT_EQ(segment.GetValue<double>(i), drand()) << "seed: " << seed << 
", idx:" << i;
+    }
+
+    // test expected correct behavior, random access
+
+    std::srand(seed);
+    bool* occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 7);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3] ||
+            occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || 
occupied[pos + 7]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+            occupied[pos + 4] = true;
+            occupied[pos + 5] = true;
+            occupied[pos + 6] = true;
+            occupied[pos + 7] = true;
+        }
+        segment.PutValue<double>(pos, drand());
+    }
+    delete[] occupied;
+
+    std::srand(seed);
+    occupied = new bool[page_size];
+    std::memset(occupied, 0, page_size * sizeof(bool));
+
+    for (int32_t i = 0; i < 1000; i++) {
+        int32_t pos = std::rand() % (page_size - 7);
+        if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] || 
occupied[pos + 3] ||
+            occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] || 
occupied[pos + 7]) {
+            continue;
+        } else {
+            occupied[pos] = true;
+            occupied[pos + 1] = true;
+            occupied[pos + 2] = true;
+            occupied[pos + 3] = true;
+            occupied[pos + 4] = true;
+            occupied[pos + 5] = true;
+            occupied[pos + 6] = true;
+            occupied[pos + 7] = true;
+        }
+
+        ASSERT_EQ(segment.GetValue<double>(pos), drand()) << "seed: " << seed 
<< ", idx:" << pos;
+    }
+    delete[] occupied;
+}
+
+// ------------------------------------------------------------------------
+//  Bulk Byte Movements
+// ------------------------------------------------------------------------
+
+TEST(MemorySegmentTest, TestBulkByteAccess) {
+    auto pool = paimon::GetDefaultPool();
+    // test expected correct behavior with default offset / length
+    auto rand_bytes = [&](int32_t size) -> std::shared_ptr<Bytes> {
+        auto bytes = Bytes::AllocateBytes(size, pool.get());
+        for (int32_t i = 0; i < static_cast<int32_t>(bytes->size()); i++) {
+            (*bytes)[i] = static_cast<char>(std::rand());
+        }
+        return bytes;
+    };
+    {
+        int32_t page_size = 64 * 1024;
+        MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+        int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+        std::srand(seed);
+        for (int32_t i = 0; i < 8; i++) {
+            auto src = rand_bytes(page_size / 8);
+            segment.Put(i * (page_size / 8), *src);
+        }
+
+        std::srand(seed);
+
+        for (int32_t i = 0; i < 8; i++) {
+            std::shared_ptr<Bytes> expected = rand_bytes(page_size / 8);
+            std::shared_ptr<Bytes> actual = Bytes::AllocateBytes(page_size / 
8, pool.get());
+            segment.Get(i * (page_size / 8), actual.get());
+            ASSERT_EQ(expected->size(), actual->size()) << "seed: " << seed << 
", i:" << i;
+            ASSERT_EQ(std::memcmp(expected->data(), actual->data(), 
expected->size()), 0)
+                << "seed: " << seed << ", i:" << i;
+        }
+    }
+
+    // test expected correct behavior with specific offset / length
+    {
+        int32_t page_size = 64 * 1024;
+        MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+
+        int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+        std::srand(seed);
+        std::shared_ptr<Bytes> expected = rand_bytes(page_size);
+
+        for (int32_t i = 0; i < 16; i++) {
+            segment.Put(i * (page_size / 16), *expected, i * (page_size / 16), 
page_size / 16);
+        }
+        auto actual = Bytes::AllocateBytes(page_size, pool.get());
+        for (int32_t i = 0; i < 16; i++) {
+            segment.Get(i * (page_size / 16), actual.get(), i * (page_size / 
16), page_size / 16);
+        }
+        ASSERT_EQ(expected->size(), actual->size()) << "seed: " << seed;
+        ASSERT_EQ(std::memcmp(expected->data(), actual->data(), 
expected->size()), 0)
+            << "seed: " << seed;
+    }
+    // put segments of various lengths to various positions
+    {
+        int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+        std::srand(seed);
+        int32_t page_size = 64 * 1024;
+        MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+        auto expected = Bytes::AllocateBytes(page_size, pool.get());
+        segment.Put(0, *expected, 0, page_size);
+        for (int32_t i = 0; i < 200; i++) {
+            int32_t num_bytes = std::rand() % (page_size - 10) + 1;
+            int32_t pos = std::rand() % (page_size - num_bytes + 1);
+            std::shared_ptr<Bytes> data = rand_bytes((std::rand() % 3 + 1) * 
num_bytes);
+            int32_t data_start_pos = std::rand() % (data->size() - num_bytes + 
1);
+
+            // copy to the expected
+            std::memcpy(expected->data() + pos, data->data() + data_start_pos, 
num_bytes);
+
+            // put to the memory segment
+            segment.Put(pos, *data, data_start_pos, num_bytes);
+        }
+
+        auto validation = Bytes::AllocateBytes(page_size, pool.get());
+        segment.Get(0, validation.get());
+        ASSERT_EQ(std::memcmp(validation->data(), expected->data(), 
expected->size()), 0)
+            << "seed: " << seed;
+    }
+    // get segments with various contents
+    {
+        int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+        std::srand(seed);
+        int32_t page_size = 64 * 1024;
+        MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size, 
pool.get());
+        std::shared_ptr<Bytes> contents = rand_bytes(page_size);
+        segment.Put(0, *contents);
+
+        for (int32_t i = 0; i < 200; i++) {
+            int32_t num_bytes = std::rand() % (page_size / 8) + 1;
+            int32_t pos = std::rand() % (page_size - num_bytes + 1);
+            std::shared_ptr<Bytes> data = rand_bytes((std::rand() % 3 + 1) * 
num_bytes);
+            int32_t data_start_pos = std::rand() % (data->size() - num_bytes + 
1);
+
+            segment.Get(pos, data.get(), data_start_pos, num_bytes);
+            Bytes expected(num_bytes, pool.get());
+            std::memcpy(expected.data(), contents->data() + pos, num_bytes);
+            Bytes validation(num_bytes, pool.get());
+            std::memcpy(validation.data(), data->data() + data_start_pos, 
num_bytes);
+            ASSERT_EQ(std::memcmp(validation.data(), expected.data(), 
expected.size()), 0)
+                << "seed: " << seed << ", i:" << i;
+        }
+    }
+}
+
+TEST(MemorySegmentTest, TestEqual) {
+    auto pool = paimon::GetDefaultPool();
+    auto seg1 = MemorySegment::Wrap(std::make_shared<Bytes>("abcd", 
pool.get()));
+    auto seg2 = MemorySegment::Wrap(std::make_shared<Bytes>("abce", 
pool.get()));
+    auto seg3 = MemorySegment::Wrap(std::make_shared<Bytes>("abcd", 
pool.get()));
+    ASSERT_EQ(seg1, seg1);
+    ASSERT_EQ(seg1, seg3);
+    ASSERT_FALSE(seg1 == seg2);
+    ASSERT_FALSE(seg2 == seg1);
+}
+
+TEST(MemorySegmentTest, TestNonOwningWrapView) {
+    // Prepare owning data as source
+    auto pool = paimon::GetDefaultPool();
+    std::string source_data = "Hello, WrapView MemorySegment!";
+    auto owning_bytes = std::make_shared<Bytes>(source_data, pool.get());
+    const char* raw_ptr = owning_bytes->data();
+    auto raw_size = static_cast<int32_t>(owning_bytes->size());
+
+    // Create non-owning segment via WrapView
+    auto seg = MemorySegment::WrapView(raw_ptr, raw_size);
+
+    // --- Data() / Size() ---
+    ASSERT_EQ(seg.Data(), raw_ptr);
+    ASSERT_EQ(seg.Size(), raw_size);
+
+    // --- Get(index) ---
+    for (int32_t i = 0; i < raw_size; ++i) {
+        ASSERT_EQ(seg.Get(i), source_data[i]);
+    }
+
+    // --- GetValue<T> ---
+    // Read first 4 bytes as int32
+    int32_t expected_int;
+    std::memcpy(&expected_int, raw_ptr, sizeof(int32_t));
+    ASSERT_EQ(seg.GetValue<int32_t>(0), expected_int);
+
+    // Read first 8 bytes as int64
+    int64_t expected_long;
+    std::memcpy(&expected_long, raw_ptr, sizeof(int64_t));
+    ASSERT_EQ(seg.GetValue<int64_t>(0), expected_long);
+
+    // --- MutableData() + Put(index, char) ---
+    char original_char = seg.Get(0);
+    seg.Put(0, 'X');
+    ASSERT_EQ(seg.Get(0), 'X');
+    seg.Put(0, original_char);  // restore
+    ASSERT_EQ(seg.Get(0), original_char);
+
+    // --- Put(index, src, offset, length) ---
+    std::string patch = "AB";
+    seg.Put(0, patch, 0, 2);
+    ASSERT_EQ(seg.Get(0), 'A');
+    ASSERT_EQ(seg.Get(1), 'B');
+
+    // --- PutValue<T> ---
+    int16_t val16 = 0x1234;
+    seg.PutValue<int16_t>(0, val16);
+    ASSERT_EQ(seg.GetValue<int16_t>(0), val16);
+
+    // --- Compare ---
+    auto seg2 = MemorySegment::WrapView(raw_ptr, raw_size);
+    // Both point to same data (we mutated seg's underlying data, seg2 sees it 
too)
+    ASSERT_EQ(seg.Compare(seg2, 0, 0, raw_size), 0);
+
+    // --- EqualTo ---
+    ASSERT_TRUE(seg.EqualTo(seg2, 2, 2, raw_size - 2));
+
+    // --- CopyTo owning target ---
+    auto target = MemorySegment::AllocateHeapMemory(raw_size, pool.get());
+    seg.CopyTo(0, &target, 0, raw_size);
+    ASSERT_EQ(seg.Compare(target, 0, 0, raw_size), 0);
+
+    // --- CopyToUnsafe ---
+    std::vector<char> buf(raw_size);
+    seg.CopyToUnsafe(0, buf.data(), 0, raw_size);
+    ASSERT_EQ(std::memcmp(buf.data(), seg.Data(), raw_size), 0);
+
+    // --- GetOrCreateHeapMemory on non-owning: should copy ---
+    auto heap = seg.GetOrCreateHeapMemory(pool.get());
+    ASSERT_NE(heap, nullptr);
+    ASSERT_EQ(static_cast<int32_t>(heap->size()), raw_size);
+    // Returned copy should be independent: modifying seg shouldn't affect heap
+    seg.Put(0, 'Z');
+    ASSERT_NE((*heap)[0], 'Z');
+
+    // --- operator== ---
+    auto seg3 = MemorySegment::WrapView(seg.Data(), seg.Size());
+    ASSERT_EQ(seg, seg3);
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/memory/memory_segment_utils.cpp 
b/src/paimon/common/memory/memory_segment_utils.cpp
new file mode 100644
index 0000000..2801854
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_utils.cpp
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment_utils.h"
+
+#include <cassert>
+
+#include "paimon/common/utils/murmurhash_utils.h"
+
+namespace paimon {
+std::shared_ptr<Bytes> MemorySegmentUtils::AllocateBytes(int32_t length, 
MemoryPool* pool) {
+    return Bytes::AllocateBytes(length, pool);
+}
+
+void MemorySegmentUtils::CopyFromBytes(std::vector<MemorySegment>* segments, 
int32_t offset,
+                                       const Bytes& bytes, int32_t 
bytes_offset,
+                                       int32_t num_bytes) {
+    if (segments->size() == 1) {
+        (*segments)[0].Put(offset, bytes, bytes_offset, num_bytes);
+    } else {
+        CopyMultiSegmentsFromBytes(segments, offset, bytes, bytes_offset, 
num_bytes);
+    }
+}
+
+void 
MemorySegmentUtils::CopyMultiSegmentsFromBytes(std::vector<MemorySegment>* 
segments,
+                                                    int32_t offset, const 
Bytes& bytes,
+                                                    int32_t bytes_offset, 
int32_t num_bytes) {
+    int32_t remain_size = num_bytes;
+    for (auto& segment : (*segments)) {
+        int32_t remain = segment.Size() - offset;
+        if (remain > 0) {
+            int32_t n_copy = std::min(remain, remain_size);
+            segment.Put(offset, bytes, num_bytes - remain_size + bytes_offset, 
n_copy);
+            remain_size -= n_copy;
+            // next new segment.
+            offset = 0;
+            if (remain_size == 0) {
+                return;
+            }
+        } else {
+            // remain is negative, let's advance to next segment
+            // now the offset = offset - segmentSize (-remain)
+            offset = -remain;
+        }
+    }
+}
+
+PAIMON_UNIQUE_PTR<Bytes> MemorySegmentUtils::CopyToBytes(const 
std::vector<MemorySegment>& segments,
+                                                         int32_t offset, 
int32_t num_bytes,
+                                                         MemoryPool* pool) {
+    assert(pool);
+    auto bytes = Bytes::AllocateBytes(num_bytes, pool);
+    CopyToBytes(segments, offset, bytes.get(), 0, num_bytes);
+    return bytes;
+}
+
+void MemorySegmentUtils::CopyToUnsafe(const std::vector<MemorySegment>& 
segments, int32_t offset,
+                                      void* target, int32_t num_bytes) {
+    if (InFirstSegment(segments, offset, num_bytes)) {
+        segments[0].CopyToUnsafe(offset, target, 0, num_bytes);
+    } else {
+        CopyMultiSegmentsToUnsafe(segments, offset, target, num_bytes);
+    }
+}
+
+void MemorySegmentUtils::CopyMultiSegmentsToUnsafe(const 
std::vector<MemorySegment>& segments,
+                                                   int32_t offset, void* 
target,
+                                                   int32_t num_bytes) {
+    int32_t remain_size = num_bytes;
+    for (const auto& segment : segments) {
+        int32_t remain = segment.Size() - offset;
+        if (remain > 0) {
+            int32_t n_copy = std::min(remain, remain_size);
+            segment.CopyToUnsafe(offset, target, num_bytes - remain_size, 
n_copy);
+            remain_size -= n_copy;
+            // next new segment.
+            offset = 0;
+            if (remain_size == 0) {
+                return;
+            }
+        } else {
+            // remain is negative, let's advance to next segment
+            // now the offset = offset - segmentSize (-remain)
+            offset = -remain;
+        }
+    }
+}
+
+std::shared_ptr<Bytes> MemorySegmentUtils::GetBytes(const 
std::vector<MemorySegment>& segments,
+                                                    int32_t base_offset, 
int32_t size_in_bytes,
+                                                    MemoryPool* pool) {
+    // avoid copy if `base` is `byte[]`
+    if (segments.size() == 1) {
+        std::shared_ptr<Bytes> heap_memory = 
segments[0].GetOrCreateHeapMemory(pool);
+        if (base_offset == 0 && heap_memory != nullptr &&
+            static_cast<int32_t>(heap_memory->size()) == size_in_bytes) {
+            return heap_memory;
+        } else {
+            std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(size_in_bytes, 
pool);
+            segments[0].Get(base_offset, bytes.get(), 0, size_in_bytes);
+            return bytes;
+        }
+    } else {
+        std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(size_in_bytes, 
pool);
+        CopyMultiSegmentsToBytes(segments, base_offset, bytes.get(), 0, 
size_in_bytes);
+        return bytes;
+    }
+}
+
+bool MemorySegmentUtils::InFirstSegment(const std::vector<MemorySegment>& 
segments, int32_t offset,
+                                        int32_t num_bytes) {
+    return num_bytes + offset <= segments[0].Size();
+}
+
+int32_t MemorySegmentUtils::ByteIndex(int32_t bit_index) {
+    return (static_cast<uint32_t>(bit_index)) >> ADDRESS_BITS_PER_WORD;
+}
+
+void MemorySegmentUtils::BitUnSet(MemorySegment* segment, int32_t base_offset, 
int32_t index) {
+    int32_t offset = base_offset + ByteIndex(index);
+    char current = segment->Get(offset);
+    current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+    segment->Put(offset, current);
+}
+
+void MemorySegmentUtils::BitSet(MemorySegment* segment, int32_t base_offset, 
int32_t index) {
+    int32_t offset = base_offset + ByteIndex(index);
+    char current = segment->Get(offset);
+    current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+    segment->Put(offset, current);
+}
+
+bool MemorySegmentUtils::BitGet(const MemorySegment& segment, int32_t 
base_offset, int32_t index) {
+    int32_t offset = base_offset + ByteIndex(index);
+    char current = segment.Get(offset);
+    return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+}
+
+void MemorySegmentUtils::BitSet(std::vector<MemorySegment>* segments, int32_t 
base_offset,
+                                int32_t index) {
+    if (segments->size() == 1) {
+        int32_t offset = base_offset + ByteIndex(index);
+        MemorySegment& segment = (*segments)[0];
+        char current = segment.Get(offset);
+        current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+        segment.Put(offset, current);
+    } else {
+        BitSetMultiSegments(segments, base_offset, index);
+    }
+}
+
+void MemorySegmentUtils::BitSetMultiSegments(std::vector<MemorySegment>* 
segments,
+                                             int32_t base_offset, int32_t 
index) {
+    int32_t offset = base_offset + ByteIndex(index);
+    int32_t seg_size = (*segments)[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+    MemorySegment& segment = (*segments)[seg_index];
+
+    char current = segment.Get(seg_offset);
+    current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+    segment.Put(seg_offset, current);
+}
+
+bool MemorySegmentUtils::BitGet(const std::vector<MemorySegment>& segments, 
int32_t base_offset,
+                                int32_t index) {
+    int32_t offset = base_offset + ByteIndex(index);
+    char current = GetValue<char>(segments, offset);
+    return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+}
+
+void MemorySegmentUtils::BitUnSet(std::vector<MemorySegment>* segments, 
int32_t base_offset,
+                                  int32_t index) {
+    if (segments->size() == 1) {
+        MemorySegment& segment = (*segments)[0];
+        int32_t offset = base_offset + ByteIndex(index);
+        char current = segment.Get(offset);
+        current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+        segment.Put(offset, current);
+    } else {
+        BitUnSetMultiSegments(segments, base_offset, index);
+    }
+}
+
+void MemorySegmentUtils::BitUnSetMultiSegments(std::vector<MemorySegment>* 
segments,
+                                               int32_t base_offset, int32_t 
index) {
+    int32_t offset = base_offset + ByteIndex(index);
+    int32_t seg_size = (*segments)[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+    MemorySegment& segment = (*segments)[seg_index];
+
+    char current = segment.Get(seg_offset);
+    current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+    segment.Put(seg_offset, current);
+}
+
+bool MemorySegmentUtils::Equals(const std::vector<MemorySegment>& segments1, 
int32_t offset1,
+                                const std::vector<MemorySegment>& segments2, 
int32_t offset2,
+                                int32_t len) {
+    if (InFirstSegment(segments1, offset1, len) && InFirstSegment(segments2, 
offset2, len)) {
+        return segments1[0].EqualTo(segments2[0], offset1, offset2, len);
+    } else {
+        return EqualsMultiSegments(segments1, offset1, segments2, offset2, 
len);
+    }
+}
+
+bool MemorySegmentUtils::EqualsMultiSegments(const std::vector<MemorySegment>& 
segments1,
+                                             int32_t offset1,
+                                             const std::vector<MemorySegment>& 
segments2,
+                                             int32_t offset2, int32_t len) {
+    if (len == 0) {
+        // quick way and avoid seg_size is zero.
+        return true;
+    }
+
+    int32_t seg_size1 = segments1[0].Size();
+    int32_t seg_size2 = segments2[0].Size();
+
+    // find first seg_index and seg_offset of segments.
+    int32_t seg_index1 = offset1 / seg_size1;
+    int32_t seg_index2 = offset2 / seg_size2;
+    int32_t seg_offset1 = offset1 - seg_size1 * seg_index1;  // equal to %
+    int32_t seg_offset2 = offset2 - seg_size2 * seg_index2;  // equal to %
+
+    while (len > 0) {
+        int32_t equal_len =
+            std::min(std::min(len, seg_size1 - seg_offset1), seg_size2 - 
seg_offset2);
+        if (!segments1[seg_index1].EqualTo(segments2[seg_index2], seg_offset1, 
seg_offset2,
+                                           equal_len)) {
+            return false;
+        }
+        len -= equal_len;
+        seg_offset1 += equal_len;
+        if (seg_offset1 == seg_size1) {
+            seg_offset1 = 0;
+            seg_index1++;
+        }
+        seg_offset2 += equal_len;
+        if (seg_offset2 == seg_size2) {
+            seg_offset2 = 0;
+            seg_index2++;
+        }
+    }
+    return true;
+}
+
+int32_t MemorySegmentUtils::Find(const std::vector<MemorySegment>& segments1, 
int32_t offset1,
+                                 int32_t num_bytes1, const 
std::vector<MemorySegment>& segments2,
+                                 int32_t offset2, int32_t num_bytes2) {
+    if (num_bytes2 == 0) {  // quick way 1.
+        return offset1;
+    }
+    if (InFirstSegment(segments1, offset1, num_bytes1) &&
+        InFirstSegment(segments2, offset2, num_bytes2)) {
+        char first = segments2[0].Get(offset2);
+        int32_t end = num_bytes1 - num_bytes2 + offset1;
+        for (int32_t i = offset1; i <= end; i++) {
+            // quick way 2: equal first byte.
+            if (segments1[0].Get(i) == first &&
+                segments1[0].EqualTo(segments2[0], i, offset2, num_bytes2)) {
+                return i;
+            }
+        }
+        return -1;
+    } else {
+        return FindInMultiSegments(segments1, offset1, num_bytes1, segments2, 
offset2, num_bytes2);
+    }
+}
+int32_t MemorySegmentUtils::FindInMultiSegments(const 
std::vector<MemorySegment>& segments1,
+                                                int32_t offset1, int32_t 
num_bytes1,
+                                                const 
std::vector<MemorySegment>& segments2,
+                                                int32_t offset2, int32_t 
num_bytes2) {
+    int32_t end = num_bytes1 - num_bytes2 + offset1;
+    for (int32_t i = offset1; i <= end; i++) {
+        if (EqualsMultiSegments(segments1, i, segments2, offset2, num_bytes2)) 
{
+            return i;
+        }
+    }
+    return -1;
+}
+
+int32_t MemorySegmentUtils::Hash(const std::vector<MemorySegment>& segments, 
int32_t offset,
+                                 int32_t num_bytes, MemoryPool* pool) {
+    if (InFirstSegment(segments, offset, num_bytes)) {
+        return MurmurHashUtils::HashBytes(segments[0], offset, num_bytes);
+    } else {
+        return HashMultiSeg(segments, offset, num_bytes, pool);
+    }
+}
+
+int32_t MemorySegmentUtils::HashByWords(const std::vector<MemorySegment>& 
segments, int32_t offset,
+                                        int32_t num_bytes, MemoryPool* pool) {
+    if (InFirstSegment(segments, offset, num_bytes)) {
+        return MurmurHashUtils::HashBytesByWords(segments[0], offset, 
num_bytes);
+    } else {
+        return HashMultiSegByWords(segments, offset, num_bytes, pool);
+    }
+}
+
+int32_t MemorySegmentUtils::HashMultiSegByWords(const 
std::vector<MemorySegment>& segments,
+                                                int32_t offset, int32_t 
num_bytes,
+                                                MemoryPool* pool) {
+    std::shared_ptr<Bytes> bytes = AllocateBytes(num_bytes, pool);
+    CopyMultiSegmentsToBytes(segments, offset, bytes.get(), 0, num_bytes);
+    return 
MurmurHashUtils::HashUnsafeBytesByWords(reinterpret_cast<void*>(bytes->data()), 
0,
+                                                   num_bytes);
+}
+
+int32_t MemorySegmentUtils::HashMultiSeg(const std::vector<MemorySegment>& 
segments, int32_t offset,
+                                         int32_t num_bytes, MemoryPool* pool) {
+    std::shared_ptr<Bytes> bytes = AllocateBytes(num_bytes, pool);
+    CopyMultiSegmentsToBytes(segments, offset, bytes.get(), 0, num_bytes);
+
+    return 
MurmurHashUtils::HashUnsafeBytes(reinterpret_cast<void*>(bytes->data()), 0, 
num_bytes);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment_utils.h 
b/src/paimon/common/memory/memory_segment_utils.h
new file mode 100644
index 0000000..6208ff5
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_utils.h
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT MemorySegmentUtils {
+ public:
+    MemorySegmentUtils() = delete;
+    ~MemorySegmentUtils() = delete;
+
+    /// Allocate bytes in pool
+    static std::shared_ptr<Bytes> AllocateBytes(int32_t length, MemoryPool* 
pool);
+
+    /// Copy target segments from source byte[].
+    ///
+    /// @param segments target segments.
+    /// @param offset target segments offset.
+    /// @param bytes source byte[].
+    /// @param bytes_offset source byte[] offset.
+    /// @param num_bytes the number bytes to copy.
+    static void CopyFromBytes(std::vector<MemorySegment>* segments, int32_t 
offset,
+                              const Bytes& bytes, int32_t bytes_offset, 
int32_t num_bytes);
+
+    /// Copy segments to a new byte[].
+    ///
+    /// @param segments Source segments.
+    /// @param offset Source segments offset.
+    /// @param num_bytes the number bytes to copy.
+    static PAIMON_UNIQUE_PTR<Bytes> CopyToBytes(const 
std::vector<MemorySegment>& segments,
+                                                int32_t offset, int32_t 
num_bytes,
+                                                MemoryPool* pool);
+
+    /// Copy segments to target byte[].
+    ///
+    /// @param segments Source segments.
+    /// @param offset Source segments offset.
+    /// @param bytes target byte[].
+    /// @param bytes_offset target byte[] offset.
+    /// @param num_bytes the number bytes to copy.
+    template <typename T>
+    static void CopyToBytes(const std::vector<MemorySegment>& segments, 
int32_t offset, T* bytes,
+                            int32_t bytes_offset, int32_t num_bytes);
+
+    /// Copy bytes of segments to output stream.
+    ///
+    /// @note It just copies the data in, not include the length.
+    ///
+    /// @param segments source segments
+    /// @param offset offset for segments
+    /// @param size_in_bytes size in bytes
+    /// @param target target output stream
+    static Status CopyToStream(const std::vector<MemorySegment>& segments, 
int32_t offset,
+                               int32_t size_in_bytes, 
MemorySegmentOutputStream* target);
+
+    /// Copy segments to target unsafe pointer.
+    ///
+    /// @param segments Source segments.
+    /// @param offset The position where the bytes are started to be read from 
these memory
+    /// segments.
+    /// @param target The unsafe memory to copy the bytes to.
+    /// @param num_bytes the number bytes to copy.
+    static void CopyToUnsafe(const std::vector<MemorySegment>& segments, 
int32_t offset,
+                             void* target, int32_t num_bytes);
+
+    template <typename T>
+    static void CopyMultiSegmentsToBytes(const std::vector<MemorySegment>& 
segments, int32_t offset,
+                                         T* bytes, int32_t bytes_offset, 
int32_t num_bytes);
+
+    static std::shared_ptr<Bytes> GetBytes(const std::vector<MemorySegment>& 
segments,
+                                           int32_t base_offset, int32_t 
size_in_bytes,
+                                           MemoryPool* pool);
+
+    /// Is it just in first MemorySegment, we use quick way to do something.
+    static bool InFirstSegment(const std::vector<MemorySegment>& segments, 
int32_t offset,
+                               int32_t num_bytes);
+
+    /// unset bit.
+    ///
+    /// @param segment target segment.
+    /// @param base_offset bits base offset.
+    /// @param index bit index from base offset.
+    static void BitUnSet(MemorySegment* segment, int32_t base_offset, int32_t 
index);
+
+    /// set bit.
+    ///
+    /// @param segment target segment.
+    /// @param base_offset bits base offset.
+    /// @param index bit index from base offset.
+    static void BitSet(MemorySegment* segment, int32_t base_offset, int32_t 
index);
+
+    /// read bit.
+    ///
+    /// @param segment target segment.
+    /// @param base_offset bits base offset.
+    /// @param index bit index from base offset.
+    static bool BitGet(const MemorySegment& segment, int32_t base_offset, 
int32_t index);
+
+    /// set bit from segments.
+    ///
+    /// @param segments target segments.
+    /// @param base_offset bits base offset.
+    /// @param index bit index from base offset.
+    static void BitSet(std::vector<MemorySegment>* segments, int32_t 
base_offset, int32_t index);
+
+    /// read bit from segments.
+    ///
+    /// @param segments target segments.
+    /// @param base_offset bits base offset.
+    /// @param index bit index from base offset.
+    static bool BitGet(const std::vector<MemorySegment>& segments, int32_t 
base_offset,
+                       int32_t index);
+
+    /// unset bit from segments.
+    ///
+    /// @param segments target segments.
+    /// @param base_offset bits base offset.
+    /// @param index bit index from base offset.
+    static void BitUnSet(std::vector<MemorySegment>* segments, int32_t 
base_offset, int32_t index);
+
+    /// get value from segments. Only support: bool, char, int16_t, int32_t, 
int64_t, double, float
+    ///
+    /// @param segments target segments.
+    /// @param offset value offset.
+    template <typename T>
+    static T GetValue(const std::vector<MemorySegment>& segments, int32_t 
offset);
+
+    /// set value to segments. Only support: bool, char, int16_t, int32_t, 
int64_t, double, float
+    ///
+    /// @param segments target segments.
+    /// @param offset value offset.
+    template <typename T>
+    static void SetValue(std::vector<MemorySegment>* segments, int32_t offset, 
const T& value);
+
+    /*
+     * Equals two memory segments regions.
+     *
+     * @param segments1 Segments 1
+     * @param offset1 Offset of segments1 to start equaling
+     * @param segments2 Segments 2
+     * @param offset2 Offset of segments2 to start equaling
+     * @param len Length of the equaled memory region
+     * @return true if equal, false otherwise
+     */
+    static bool Equals(const std::vector<MemorySegment>& segments1, int32_t 
offset1,
+                       const std::vector<MemorySegment>& segments2, int32_t 
offset2, int32_t len);
+
+    static bool EqualsMultiSegments(const std::vector<MemorySegment>& 
segments1, int32_t offset1,
+                                    const std::vector<MemorySegment>& 
segments2, int32_t offset2,
+                                    int32_t len);
+
+    /// Find equal segments2 in segments1.
+    ///
+    /// @param segments1 segs to find.
+    /// @param segments2 sub segs.
+    /// @return Return the found offset, return -1 if not find.
+    static int32_t Find(const std::vector<MemorySegment>& segments1, int32_t 
offset1,
+                        int32_t num_bytes1, const std::vector<MemorySegment>& 
segments2,
+                        int32_t offset2, int32_t num_bytes2);
+
+    /// hash segments to int, num_bytes must be aligned to 4 bytes.
+    ///
+    /// @param segments Source segments.
+    /// @param offset Source segments offset.
+    /// @param num_bytes the number bytes to hash.
+    static int32_t HashByWords(const std::vector<MemorySegment>& segments, 
int32_t offset,
+                               int32_t num_bytes, MemoryPool* pool);
+
+    /// hash segments to int.
+    ///
+    /// @param segments Source segments.
+    /// @param offset Source segments offset.
+    /// @param num_bytes the number bytes to hash.
+    static int32_t Hash(const std::vector<MemorySegment>& segments, int32_t 
offset,
+                        int32_t num_bytes, MemoryPool* pool);
+
+ private:
+    static constexpr int32_t ADDRESS_BITS_PER_WORD = 3;
+    static constexpr int32_t BIT_BYTE_INDEX_MASK = 7;
+    static constexpr int32_t MAX_BYTES_LENGTH = 1024 * 64;
+    static constexpr int32_t MAX_CHARS_LENGTH = 1024 * 32;
+
+ private:
+    template <typename T>
+    static void SetValueToMultiSegments(std::vector<MemorySegment>* segments, 
int32_t offset,
+                                        const T& value);
+
+    template <typename T>
+    static void SetValueSlowly(std::vector<MemorySegment>* segments, int32_t 
seg_size,
+                               int32_t seg_num, int32_t seg_offset, const T& 
value);
+    template <typename T>
+    static T GetValueFromMultiSegments(const std::vector<MemorySegment>& 
segments, int32_t offset);
+
+    template <typename T>
+    static T GetValueSlowly(const std::vector<MemorySegment>& segments, 
int32_t seg_size,
+                            int32_t seg_num, int32_t seg_offset);
+
+    static void CopyMultiSegmentsFromBytes(std::vector<MemorySegment>* 
segments, int32_t offset,
+                                           const Bytes& bytes, int32_t 
bytes_offset,
+                                           int32_t num_bytes);
+
+    static void CopyMultiSegmentsToUnsafe(const std::vector<MemorySegment>& 
segments,
+                                          int32_t offset, void* target, 
int32_t num_bytes);
+
+    static int32_t FindInMultiSegments(const std::vector<MemorySegment>& 
segments1, int32_t offset1,
+                                       int32_t num_bytes1,
+                                       const std::vector<MemorySegment>& 
segments2, int32_t offset2,
+                                       int32_t num_bytes2);
+
+    static int32_t HashMultiSeg(const std::vector<MemorySegment>& segments, 
int32_t offset,
+                                int32_t num_bytes, MemoryPool* pool);
+
+    static int32_t HashMultiSegByWords(const std::vector<MemorySegment>& 
segments, int32_t offset,
+                                       int32_t num_bytes, MemoryPool* pool);
+
+    /// Given a bit index, return the byte index containing it.
+    ///
+    /// @param bit_index the bit index.
+    /// @return the byte index.
+    static int32_t ByteIndex(int32_t bit_index);
+
+    static void BitSetMultiSegments(std::vector<MemorySegment>* segments, 
int32_t base_offset,
+                                    int32_t index);
+
+    static void BitUnSetMultiSegments(std::vector<MemorySegment>* segments, 
int32_t base_offset,
+                                      int32_t index);
+};
+
+template <typename T>
+inline void MemorySegmentUtils::CopyToBytes(const std::vector<MemorySegment>& 
segments,
+                                            int32_t offset, T* bytes, int32_t 
bytes_offset,
+                                            int32_t num_bytes) {
+    if (InFirstSegment(segments, offset, num_bytes)) {
+        segments[0].Get(offset, bytes, bytes_offset, num_bytes);
+    } else {
+        CopyMultiSegmentsToBytes(segments, offset, bytes, bytes_offset, 
num_bytes);
+    }
+}
+
+template <typename T>
+inline void MemorySegmentUtils::CopyMultiSegmentsToBytes(const 
std::vector<MemorySegment>& segments,
+                                                         int32_t offset, T* 
bytes,
+                                                         int32_t bytes_offset, 
int32_t num_bytes) {
+    int32_t remain_size = num_bytes;
+    for (const auto& segment : segments) {
+        int32_t remain = segment.Size() - offset;
+        if (remain > 0) {
+            int32_t n_copy = std::min(remain, remain_size);
+            segment.Get(offset, bytes, num_bytes - remain_size + bytes_offset, 
n_copy);
+            remain_size -= n_copy;
+            // next new segment.
+            offset = 0;
+            if (remain_size == 0) {
+                return;
+            }
+        } else {
+            // remain is negative, let's advance to next segment
+            // now the offset = offset - segment_size (-remain)
+            offset = -remain;
+        }
+    }
+}
+
+template <typename T>
+inline T MemorySegmentUtils::GetValue(const std::vector<MemorySegment>& 
segments, int32_t offset) {
+    static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+    if (InFirstSegment(segments, offset, sizeof(T))) {
+        return segments[0].GetValue<T>(offset);
+    } else {
+        return GetValueFromMultiSegments<T>(segments, offset);
+    }
+}
+
+template <typename T>
+inline void MemorySegmentUtils::SetValue(std::vector<MemorySegment>* segments, 
int32_t offset,
+                                         const T& value) {
+    static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+    if (InFirstSegment(*segments, offset, sizeof(T))) {
+        (*segments)[0].PutValue<T>(offset, value);
+    } else {
+        SetValueToMultiSegments<T>(segments, offset, value);
+    }
+}
+
+template <typename T>
+inline void 
MemorySegmentUtils::SetValueToMultiSegments(std::vector<MemorySegment>* 
segments,
+                                                        int32_t offset, const 
T& value) {
+    int32_t seg_size = (*segments)[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+
+    if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(T))) {
+        (*segments)[seg_index].PutValue<T>(seg_offset, value);
+    } else {
+        SetValueSlowly<T>(segments, seg_size, seg_index, seg_offset, value);
+    }
+}
+
+template <>
+inline void 
MemorySegmentUtils::SetValueToMultiSegments(std::vector<MemorySegment>* 
segments,
+                                                        int32_t offset, const 
float& value) {
+    int32_t seg_size = (*segments)[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+
+    if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(float))) {
+        (*segments)[seg_index].PutValue<float>(seg_offset, value);
+    } else {
+        int32_t int_value;
+        std::memcpy(&int_value, &value, sizeof(float));
+        SetValueSlowly<int32_t>(segments, seg_size, seg_index, seg_offset, 
int_value);
+    }
+}
+
+template <>
+inline void 
MemorySegmentUtils::SetValueToMultiSegments(std::vector<MemorySegment>* 
segments,
+                                                        int32_t offset, const 
double& value) {
+    int32_t seg_size = (*segments)[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+
+    if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(double))) {
+        (*segments)[seg_index].PutValue<double>(seg_offset, value);
+    } else {
+        int64_t long_value;
+        std::memcpy(&long_value, &value, sizeof(double));
+        SetValueSlowly<int64_t>(segments, seg_size, seg_index, seg_offset, 
long_value);
+    }
+}
+
+template <typename T>
+inline void MemorySegmentUtils::SetValueSlowly(std::vector<MemorySegment>* 
segments,
+                                               int32_t seg_size, int32_t 
seg_num,
+                                               int32_t seg_offset, const T& 
value) {
+    MemorySegment segment = (*segments)[seg_num];
+    for (size_t i = 0; i < sizeof(T); i++) {
+        if (seg_offset == seg_size) {
+            segment = (*segments)[++seg_num];
+            seg_offset = 0;
+        }
+        T unsigned_byte;
+        if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+            unsigned_byte = value >> (i * 8);
+        } else {
+            int32_t shift_count = sizeof(T) - 1;
+            unsigned_byte = value >> ((shift_count - i) * 8);
+        }
+        segment.Put(seg_offset, static_cast<char>(unsigned_byte));
+        seg_offset++;
+    }
+}
+
+template <typename T>
+inline T MemorySegmentUtils::GetValueFromMultiSegments(const 
std::vector<MemorySegment>& segments,
+                                                       int32_t offset) {
+    int32_t seg_size = segments[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+    if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(T))) {
+        return segments[seg_index].GetValue<T>(seg_offset);
+    } else {
+        return GetValueSlowly<T>(segments, seg_size, seg_index, seg_offset);
+    }
+}
+
+template <>
+inline float MemorySegmentUtils::GetValueFromMultiSegments(
+    const std::vector<MemorySegment>& segments, int32_t offset) {
+    int32_t seg_size = segments[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+    if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(float))) {
+        return segments[seg_index].GetValue<float>(seg_offset);
+    } else {
+        auto int_value = GetValueSlowly<int32_t>(segments, seg_size, 
seg_index, seg_offset);
+        float float_value;
+        std::memcpy(&float_value, &int_value, sizeof(float));
+        return float_value;
+    }
+}
+
+template <>
+inline double MemorySegmentUtils::GetValueFromMultiSegments(
+    const std::vector<MemorySegment>& segments, int32_t offset) {
+    int32_t seg_size = segments[0].Size();
+    int32_t seg_index = offset / seg_size;
+    int32_t seg_offset = offset - seg_index * seg_size;  // equal to %
+    if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(double))) {
+        return segments[seg_index].GetValue<double>(seg_offset);
+    } else {
+        auto long_value = GetValueSlowly<int64_t>(segments, seg_size, 
seg_index, seg_offset);
+        double double_value;
+        std::memcpy(&double_value, &long_value, sizeof(double));
+        return double_value;
+    }
+}
+
+template <typename T>
+inline T MemorySegmentUtils::GetValueSlowly(const std::vector<MemorySegment>& 
segments,
+                                            int32_t seg_size, int32_t seg_num, 
int32_t seg_offset) {
+    MemorySegment segment = segments[seg_num];
+    T ret = 0;
+    for (size_t i = 0; i < sizeof(T); i++) {
+        if (seg_offset == seg_size) {
+            segment = segments[++seg_num];
+            seg_offset = 0;
+        }
+        T unsigned_byte = segment.Get(seg_offset) & 0xff;
+        if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+            ret |= (unsigned_byte << (i * 8));
+        } else {
+            int32_t shift_count = sizeof(T) - 1;
+            ret |= (unsigned_byte << ((shift_count - i) * 8));
+        }
+        seg_offset++;
+    }
+    return ret;
+}
+
+inline Status MemorySegmentUtils::CopyToStream(const 
std::vector<MemorySegment>& segments,
+                                               int32_t offset, int32_t 
size_in_bytes,
+                                               MemorySegmentOutputStream* 
target) {
+    for (const auto& source_segment : segments) {
+        int32_t cur_seg_remain = source_segment.Size() - offset;
+        if (cur_seg_remain > 0) {
+            int32_t copy_size = std::min(cur_seg_remain, size_in_bytes);
+            target->Write(source_segment, offset, copy_size);
+            size_in_bytes -= copy_size;
+            offset = 0;
+        } else {
+            offset -= source_segment.Size();
+        }
+
+        if (size_in_bytes == 0) {
+            return Status::OK();
+        }
+    }
+    if (size_in_bytes != 0) {
+        return Status::Invalid(
+            fmt::format("No copy finished, this should be a bug, "
+                        "The remaining length is: {}",
+                        size_in_bytes));
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment_utils_test.cpp 
b/src/paimon/common/memory/memory_segment_utils_test.cpp
new file mode 100644
index 0000000..e4320b2
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_utils_test.cpp
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment_utils.h"
+
+#include <cstdlib>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(MemorySegmentUtilsTest, TestSetAndGetValue) {
+    auto pool = GetDefaultPool();
+    for (auto single_segment_size : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128}) {
+        std::vector<MemorySegment> segments;
+        segments.reserve(50);
+        for (size_t i = 0; i < 50; i++) {
+            segments.push_back(
+                MemorySegment::Wrap(Bytes::AllocateBytes(single_segment_size, 
pool.get())));
+        }
+        int32_t offset = 0;
+        MemorySegmentUtils::SetValue<int32_t>(&segments, offset, 233);
+        ASSERT_EQ(233, MemorySegmentUtils::GetValue<int32_t>(segments, 
offset));
+
+        offset += sizeof(int32_t);
+        MemorySegmentUtils::SetValue<int64_t>(&segments, offset, 23333333333);
+        ASSERT_EQ(23333333333, MemorySegmentUtils::GetValue<int64_t>(segments, 
offset));
+
+        offset += sizeof(int64_t);
+        MemorySegmentUtils::SetValue<float>(&segments, offset, 233.3);
+        ASSERT_NEAR(233.3, MemorySegmentUtils::GetValue<float>(segments, 
offset), 0.001);
+
+        offset += sizeof(float);
+        MemorySegmentUtils::SetValue<double>(&segments, offset, 244.3);
+        ASSERT_NEAR(244.3, MemorySegmentUtils::GetValue<double>(segments, 
offset), 0.001);
+
+        offset += sizeof(double);
+        MemorySegmentUtils::SetValue<int16_t>(&segments, offset, 5564);
+        ASSERT_EQ(5564, MemorySegmentUtils::GetValue<int16_t>(segments, 
offset));
+
+        offset += sizeof(int16_t);
+        MemorySegmentUtils::SetValue<char>(&segments, offset, 123);
+        ASSERT_EQ(123, MemorySegmentUtils::GetValue<char>(segments, offset));
+
+        offset += sizeof(char);
+        MemorySegmentUtils::SetValue<bool>(&segments, offset, true);
+        ASSERT_EQ(true, MemorySegmentUtils::GetValue<bool>(segments, offset));
+
+        ASSERT_EQ(233, MemorySegmentUtils::GetValue<int32_t>(segments, 0));
+    }
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyFromBytesAndGetBytes) {
+    auto pool = GetDefaultPool();
+    int32_t str_size = 1024;
+    std::string test_string1, test_string2;
+    test_string1.reserve(str_size);
+    test_string2.reserve(str_size);
+    for (int32_t j = 0; j < str_size; j++) {
+        test_string1 += static_cast<char>(paimon::test::RandomNumber(0, 25) + 
'a');
+        test_string2 += static_cast<char>(paimon::test::RandomNumber(0, 25) + 
'a');
+    }
+    test_string2 += test_string1;
+    std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes(test_string1, 
pool.get());
+    std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes(test_string2, 
pool.get());
+
+    std::vector<MemorySegment> segs = 
{MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+    MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes1, 0, 
test_string1.size());
+    PAIMON_UNIQUE_PTR<Bytes> bytes_serialize =
+        MemorySegmentUtils::CopyToBytes(segs, 0, test_string1.size(), 
pool.get());
+    auto bytes_get = MemorySegmentUtils::GetBytes(segs, /*base_offset=*/0,
+                                                  /*size_in_bytes=*/str_size, 
pool.get());
+    ASSERT_EQ(*bytes_get, *bytes1);
+    std::string str_serialize = std::string(bytes_serialize->data(), 
bytes_serialize->size());
+    ASSERT_EQ(test_string1, str_serialize);
+
+    // test MultiSegments
+    std::vector<MemorySegment> segs2 = 
{MemorySegment::AllocateHeapMemory(str_size, pool.get()),
+                                        
MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+    MemorySegmentUtils::CopyFromBytes(&segs2, 0, *bytes2, 0, 
test_string2.size());
+    PAIMON_UNIQUE_PTR<Bytes> bytes_serialize2 =
+        MemorySegmentUtils::CopyToBytes(segs2, 0, test_string2.size(), 
pool.get());
+    std::string str_serialize2 = std::string(bytes_serialize2->data(), 
bytes_serialize2->size());
+    ASSERT_EQ(test_string2, str_serialize2);
+
+    std::shared_ptr<Bytes> bytes_serialize3 = MemorySegmentUtils::GetBytes(
+        segs2, test_string2.size() - test_string1.size(), test_string1.size(), 
pool.get());
+    std::string str_serialize3 = std::string(bytes_serialize3->data(), 
bytes_serialize3->size());
+    ASSERT_EQ(test_string1, str_serialize3);
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyToUnsafe) {
+    auto pool = GetDefaultPool();
+    int32_t str_size = 1024;
+    std::string test_string1, test_string2;
+    test_string1.reserve(str_size);
+    test_string2.reserve(str_size);
+    for (int32_t j = 0; j < str_size; j++) {
+        test_string1 += static_cast<char>(paimon::test::RandomNumber(0, 25) + 
'a');
+        test_string2 += static_cast<char>(paimon::test::RandomNumber(0, 25) + 
'a');
+    }
+    test_string2 += test_string1;
+    std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes(test_string1, 
pool.get());
+    std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes(test_string2, 
pool.get());
+
+    std::vector<MemorySegment> segs = 
{MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+    MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes1, 0, 
test_string1.size());
+    std::shared_ptr<Bytes> bytes_serialize = 
Bytes::AllocateBytes(test_string1.size(), pool.get());
+    MemorySegmentUtils::CopyToUnsafe(segs, 0, bytes_serialize->data(), 
test_string1.size());
+    ASSERT_EQ(test_string1, std::string(bytes_serialize->data(), 
bytes_serialize->size()));
+
+    // test MultiSegments
+    std::vector<MemorySegment> segs2 = 
{MemorySegment::AllocateHeapMemory(str_size, pool.get()),
+                                        
MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+    MemorySegmentUtils::CopyFromBytes(&segs2, 0, *bytes2, 0, 
test_string2.size());
+    std::shared_ptr<Bytes> bytes_serialize2 = 
Bytes::AllocateBytes(test_string1.size(), pool.get());
+    MemorySegmentUtils::CopyToUnsafe(segs2, test_string2.size() - 
test_string1.size(),
+                                     bytes_serialize2->data(), 
test_string1.size());
+    ASSERT_EQ(test_string1, std::string(bytes_serialize2->data(), 
bytes_serialize2->size()));
+}
+
+TEST(MemorySegmentUtilsTest, TestSetAndUnSet) {
+    auto pool = GetDefaultPool();
+    int32_t str_size = 1024;
+    std::string test_string1, test_string2;
+    test_string1.reserve(str_size);
+    test_string2.reserve(str_size);
+    for (int32_t j = 0; j < str_size; j++) {
+        test_string1 += static_cast<char>(paimon::test::RandomNumber(0, 25) + 
'a');
+        test_string2 += static_cast<char>(paimon::test::RandomNumber(0, 25) + 
'a');
+    }
+    test_string2 += test_string1;
+    std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes(test_string2, 
pool.get());
+
+    std::vector<MemorySegment> segs = 
{MemorySegment::AllocateHeapMemory(str_size, pool.get()),
+                                       
MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+    MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes2, 0, 
test_string2.size());
+    int32_t index = paimon::test::RandomNumber(0, str_size - 1);
+    MemorySegmentUtils::BitUnSet(&segs, str_size, index);
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, str_size, index));
+    MemorySegmentUtils::BitSet(&segs, str_size, index);
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, str_size, index));
+    MemorySegmentUtils::BitSet(&segs[0], /*base_offset=*/0, index);
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs[0], /*base_offset=*/0, index));
+    MemorySegmentUtils::BitUnSet(&segs[0], /*base_offset=*/0, index);
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs[0], /*base_offset=*/0, 
index));
+}
+
+TEST(MemorySegmentUtilsTest, TestBitSetAndUnSetSingleSegment) {
+    auto pool = GetDefaultPool();
+    int32_t segment_size = 128;
+    std::vector<MemorySegment> segs = 
{MemorySegment::AllocateHeapMemory(segment_size, pool.get())};
+
+    // initially all bits should be 0 after allocation
+    for (int32_t i = 0; i < 16; i++) {
+        ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, i));
+    }
+
+    // set bits at various indices and verify
+    MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/0);
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 0));
+
+    MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/7);
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 7));
+
+    MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/15);
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15));
+
+    // unset and verify
+    MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/0);
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 0));
+
+    MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/7);
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 7));
+
+    // bit 15 should still be set
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15));
+
+    MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/15);
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15));
+
+    // test with non-zero base_offset
+    int32_t base_offset = 10;
+    MemorySegmentUtils::BitSet(&segs, base_offset, /*index=*/3);
+    ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, base_offset, 3));
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, base_offset, 2));
+
+    MemorySegmentUtils::BitUnSet(&segs, base_offset, /*index=*/3);
+    ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, base_offset, 3));
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyMultiSegmentsFromBytes) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("abcdef", pool.get());
+    int32_t segment_size = 10;
+    std::vector<MemorySegment> segs = 
{MemorySegment::AllocateHeapMemory(segment_size, pool.get()),
+                                       
MemorySegment::AllocateHeapMemory(segment_size, pool.get())};
+    {
+        MemorySegmentUtils::CopyMultiSegmentsFromBytes(&segs, /*offset=*/3, 
*bytes,
+                                                       /*bytes_offset=*/0,
+                                                       
/*num_bytes=*/bytes->size());
+        auto result_bytes =
+            MemorySegmentUtils::CopyToBytes(segs, /*offset=*/3,
+                                            /*num_bytes=*/bytes->size(), 
pool.get());
+        ASSERT_EQ(*bytes, *result_bytes);
+    }
+    {
+        MemorySegmentUtils::CopyMultiSegmentsFromBytes(&segs, /*offset=*/12, 
*bytes,
+                                                       /*bytes_offset=*/0,
+                                                       
/*num_bytes=*/bytes->size());
+        auto result_bytes =
+            MemorySegmentUtils::CopyToBytes(segs, /*offset=*/12,
+                                            /*num_bytes=*/bytes->size(), 
pool.get());
+        ASSERT_EQ(*bytes, *result_bytes);
+    }
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyToStream) {
+    auto pool = GetDefaultPool();
+    int32_t segment_size = 3;
+    std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes("abc", pool.get());
+    std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes("def", pool.get());
+    std::vector<MemorySegment> segs = {MemorySegment::Wrap(bytes1), 
MemorySegment::Wrap(bytes2)};
+    {
+        MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool);
+        ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/0,
+                                                   
/*size_in_bytes=*/segment_size * 2, &out));
+        std::shared_ptr<Bytes> expected_bytes = Bytes::AllocateBytes("abcdef", 
pool.get());
+        auto bytes =
+            MemorySegmentUtils::CopyToBytes(out.Segments(), 0, 
out.CurrentSize(), pool.get());
+        ASSERT_EQ(*expected_bytes, *bytes);
+    }
+    {
+        MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool);
+        ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/2, 
/*size_in_bytes=*/4, &out));
+        std::shared_ptr<Bytes> expected_bytes = Bytes::AllocateBytes("cdef", 
pool.get());
+        auto bytes =
+            MemorySegmentUtils::CopyToBytes(out.Segments(), 0, 
out.CurrentSize(), pool.get());
+        ASSERT_EQ(*expected_bytes, *bytes);
+    }
+    {
+        MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool);
+        ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/4, 
/*size_in_bytes=*/2, &out));
+        std::shared_ptr<Bytes> expected_bytes = Bytes::AllocateBytes("ef", 
pool.get());
+        auto bytes =
+            MemorySegmentUtils::CopyToBytes(out.Segments(), 0, 
out.CurrentSize(), pool.get());
+        ASSERT_EQ(*expected_bytes, *bytes);
+    }
+}
+
+TEST(MemorySegmentUtilsTest, TestFind) {
+    auto pool = GetDefaultPool();
+    std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes("abc", pool.get());
+    std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes("def", pool.get());
+    std::shared_ptr<Bytes> bytes3 = Bytes::AllocateBytes("adef", pool.get());
+    std::shared_ptr<Bytes> bytes4 = Bytes::AllocateBytes("ghi", pool.get());
+    std::vector<MemorySegment> segs1 = {MemorySegment::Wrap(bytes1),
+                                        MemorySegment::Wrap(bytes2)};  // 
abcdef
+    std::vector<MemorySegment> segs2 = {MemorySegment::Wrap(bytes3),
+                                        MemorySegment::Wrap(bytes4)};  // 
adefghi
+    // find ""
+    ASSERT_EQ(1, MemorySegmentUtils::Find(segs1, /*offset1=*/1, 
/*num_bytes1=*/5, segs2,
+                                          /*offset2=*/0, /*num_bytes2=*/0));
+    // find "def"
+    ASSERT_EQ(3, MemorySegmentUtils::Find(segs1, /*offset1=*/0, 
/*num_bytes1=*/6, segs2,
+                                          /*offset2=*/1, /*num_bytes2=*/3));
+    // find "defg"
+    ASSERT_EQ(-1, MemorySegmentUtils::Find(segs1, /*offset1=*/0, 
/*num_bytes1=*/6, segs2,
+                                           /*offset2=*/1, /*num_bytes2=*/4));
+    // find "de" in "abc" of segs1
+    ASSERT_EQ(-1, MemorySegmentUtils::Find(segs1, /*offset1=*/0, 
/*num_bytes1=*/3, segs2,
+                                           /*offset2=*/1, /*num_bytes2=*/2));
+    // find "a" in "abc" of segs1
+    ASSERT_EQ(0, MemorySegmentUtils::Find(segs1, /*offset1=*/0, 
/*num_bytes1=*/3, segs2,
+                                          /*offset2=*/0, /*num_bytes2=*/1));
+}
+
+}  // namespace paimon::test


Reply via email to