This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 534b67d feat: introduce common/memory module (#11)
534b67d is described below
commit 534b67ddea2de969098d58aee5d6ad89e2998ac6
Author: lxy <[email protected]>
AuthorDate: Tue May 26 13:38:20 2026 +0800
feat: introduce common/memory module (#11)
---
include/paimon/io/byte_order.h | 57 ++
src/paimon/common/memory/memory_segment.cpp | 84 +++
src/paimon/common/memory/memory_segment.h | 211 ++++++
src/paimon/common/memory/memory_segment_test.cpp | 760 +++++++++++++++++++++
src/paimon/common/memory/memory_segment_utils.cpp | 334 +++++++++
src/paimon/common/memory/memory_segment_utils.h | 480 +++++++++++++
.../common/memory/memory_segment_utils_test.cpp | 295 ++++++++
7 files changed, 2221 insertions(+)
diff --git a/include/paimon/io/byte_order.h b/include/paimon/io/byte_order.h
new file mode 100644
index 0000000..299244c
--- /dev/null
+++ b/include/paimon/io/byte_order.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+namespace paimon {
+
+#if defined(__s390x__)
+#define PAIMON_LITTLEENDIAN 0
+#endif // __s390x__
+#if !defined(PAIMON_LITTLEENDIAN)
+#if defined(__GNUC__) || defined(__clang__) || defined(__ICCARM__)
+#if (defined(__BIG_ENDIAN__) || (defined(__BYTE_ORDER__) && __BYTE_ORDER__ ==
__ORDER_BIG_ENDIAN__))
+#define PAIMON_LITTLEENDIAN 0
+#else
+#define PAIMON_LITTLEENDIAN 1
+#endif // __BIG_ENDIAN__
+#elif defined(_MSC_VER)
+#if defined(_M_PPC)
+#define PAIMON_LITTLEENDIAN 0
+#else
+#define PAIMON_LITTLEENDIAN 1
+#endif
+#else
+#error Unable to determine endianness, define PAIMON_LITTLEENDIAN.
+#endif
+#endif // !defined(PAIMON_LITTLEENDIAN)
+
+enum class ByteOrder : int8_t { PAIMON_BIG_ENDIAN = 1, PAIMON_LITTLE_ENDIAN =
2 };
+
+/// Get the byte order of the system.
+constexpr ByteOrder SystemByteOrder() {
+ if (PAIMON_LITTLEENDIAN) {
+ return ByteOrder::PAIMON_LITTLE_ENDIAN;
+ } else {
+ return ByteOrder::PAIMON_BIG_ENDIAN;
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment.cpp
b/src/paimon/common/memory/memory_segment.cpp
new file mode 100644
index 0000000..be4a8d5
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment.cpp
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment.h"
+
+#include <algorithm>
+
+namespace paimon {
+
+int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1,
int32_t offset2,
+ int32_t len) const {
+ while (len >= 8) {
+ uint64_t l1 = GetLongBigEndian(offset1);
+ uint64_t l2 = seg2.GetLongBigEndian(offset2);
+
+ if (l1 != l2) {
+ return (l1 < l2) ? -1 : 1;
+ }
+
+ offset1 += 8;
+ offset2 += 8;
+ len -= 8;
+ }
+ while (len > 0) {
+ int32_t b1 = Get(offset1) & 0xff;
+ int32_t b2 = seg2.Get(offset2) & 0xff;
+ int32_t cmp = b1 - b2;
+ if (cmp != 0) {
+ return cmp;
+ }
+ offset1++;
+ offset2++;
+ len--;
+ }
+ return 0;
+}
+
+int32_t MemorySegment::Compare(const MemorySegment& seg2, int32_t offset1,
int32_t offset2,
+ int32_t len1, int32_t len2) const {
+ const int32_t min_length = std::min(len1, len2);
+ int32_t c = Compare(seg2, offset1, offset2, min_length);
+ return c == 0 ? (len1 - len2) : c;
+}
+
+bool MemorySegment::EqualTo(const MemorySegment& seg2, int32_t offset1,
int32_t offset2,
+ int32_t length) const {
+ int32_t i = 0;
+ // we assume unaligned accesses are supported.
+ // Compare 8 bytes at a time.
+ while (i <= length - 8) {
+ if (GetValue<int64_t>(offset1 + i) != seg2.GetValue<int64_t>(offset2 +
i)) {
+ return false;
+ }
+ i += 8;
+ }
+
+ // cover the last (length % 8) elements.
+ while (i < length) {
+ if (Get(offset1 + i) != seg2.Get(offset2 + i)) {
+ return false;
+ }
+ i += 1;
+ }
+
+ return true;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment.h
b/src/paimon/common/memory/memory_segment.h
new file mode 100644
index 0000000..a98e5d8
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment.h
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <type_traits>
+
+#include "paimon/common/utils/math.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class MemoryPool;
+
+/// This class represents a piece of memory.
+///
+/// Supports two modes:
+/// - Owning mode: holds a shared_ptr<Bytes> for lifetime management.
+/// - Non-owning (view) mode: holds a raw pointer to external data.
+/// The caller must ensure the underlying memory outlives this segment.
+class PAIMON_EXPORT MemorySegment {
+ public:
+ MemorySegment() : data_(nullptr), size_(0) {}
+
+ /// Wrap a shared_ptr<Bytes> to create an owning segment.
+ static MemorySegment Wrap(const std::shared_ptr<Bytes>& buffer) {
+ return MemorySegment(buffer);
+ }
+
+ /// Create a non-owning segment that references external memory.
+ /// The caller must guarantee that `data` remains valid for the lifetime
of this segment.
+ static MemorySegment WrapView(const char* data, int32_t size) {
+ return MemorySegment(data, size);
+ }
+
+ static MemorySegment AllocateHeapMemory(int32_t size, MemoryPool* pool) {
+ assert(pool);
+ return Wrap(Bytes::AllocateBytes(size, pool));
+ }
+
+ MemorySegment(const MemorySegment& other) = default;
+ MemorySegment& operator=(const MemorySegment& other) = default;
+
+ bool operator==(const MemorySegment& other) const {
+ if (this == &other) {
+ return true;
+ }
+ if (data_ == other.data_ && size_ == other.size_) {
+ return true;
+ }
+ if (!data_ || !other.data_) {
+ return false;
+ }
+ if (size_ != other.size_) {
+ return false;
+ }
+ return std::memcmp(data_, other.data_, size_) == 0;
+ }
+
+ inline int32_t Size() const {
+ return size_;
+ }
+
+ /// Returns the raw data pointer (valid for both owning and non-owning
segments).
+ inline const char* Data() const {
+ return data_;
+ }
+
+ inline char Get(int32_t index) const {
+ return *(data_ + index);
+ }
+
+ /// Returns a mutable pointer to the data. Use with caution on non-owning
segments.
+ inline char* MutableData() {
+ return const_cast<char*>(data_);
+ }
+
+ inline void Put(int32_t index, char b) {
+ MutableData()[index] = b;
+ }
+
+ inline void Get(int32_t index, Bytes* dst) const {
+ return Get(index, dst, /*offset=*/0, dst->size());
+ }
+
+ inline void Put(int32_t index, const Bytes& src) {
+ return Put(index, src, /*offset=*/0, src.size());
+ }
+
+ template <typename T>
+ inline void Get(int32_t index, T* dst, int32_t offset, int32_t length)
const {
+ assert(static_cast<int32_t>(dst->size()) >= (offset + length));
+ assert(size_ >= (index + length));
+ std::memcpy(const_cast<char*>(dst->data()) + offset, data_ + index,
length);
+ }
+
+ template <typename T>
+ inline void Put(int32_t index, const T& src, int32_t offset, int32_t
length) {
+ assert(static_cast<int32_t>(src.size()) >= (offset + length));
+ assert(size_ >= (index + length));
+ std::memcpy(MutableData() + index, src.data() + offset, length);
+ }
+
+ template <typename T>
+ T GetValue(int32_t index) const {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+ T value;
+ std::memcpy(&value, data_ + index, sizeof(T));
+ return value;
+ }
+
+ template <typename T>
+ void PutValue(int32_t index, const T& value) {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+
+ std::memcpy(MutableData() + index, &value, sizeof(T));
+ }
+
+ inline uint64_t GetLongBigEndian(int32_t index) const {
+ auto value = GetValue<uint64_t>(index);
+ if constexpr (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+ return EndianSwapValue(value);
+ }
+ return value;
+ }
+
+ void CopyTo(int32_t offset, MemorySegment* target, int32_t target_offset,
+ int32_t num_bytes) const {
+ assert(offset >= 0);
+ assert(target_offset >= 0);
+ assert(num_bytes >= 0);
+
+ std::memcpy(target->MutableData() + target_offset, data_ + offset,
num_bytes);
+ }
+
+ void CopyToUnsafe(int32_t offset, void* target, int32_t target_offset,
+ int32_t num_bytes) const {
+ std::memcpy(static_cast<char*>(target) + target_offset, data_ +
offset, num_bytes);
+ }
+
+ int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t
offset2, int32_t len) const;
+
+ int32_t Compare(const MemorySegment& seg2, int32_t offset1, int32_t
offset2, int32_t len1,
+ int32_t len2) const;
+
+ bool EqualTo(const MemorySegment& seg2, int32_t offset1, int32_t offset2,
int32_t length) const;
+
+ std::shared_ptr<Bytes> GetOrCreateHeapMemory(MemoryPool* pool) const {
+ if (heap_memory_) {
+ return heap_memory_;
+ }
+ if (!data_) {
+ return nullptr;
+ }
+ auto copy = std::make_shared<Bytes>(size_, pool);
+ std::memcpy(const_cast<char*>(copy->data()), data_, size_);
+ return copy;
+ }
+
+ private:
+ /// Owning constructor.
+ explicit MemorySegment(const std::shared_ptr<Bytes>& heap_memory)
+ : heap_memory_(heap_memory),
+ data_(heap_memory->data()),
+ size_(static_cast<int32_t>(heap_memory->size())) {
+ assert(heap_memory_);
+ }
+
+ /// Non-owning constructor.
+ MemorySegment(const char* data, int32_t size)
+ : heap_memory_(nullptr), data_(data), size_(size) {
+ assert(data != nullptr || size == 0);
+ }
+
+ std::shared_ptr<Bytes> heap_memory_;
+ const char* data_;
+ int32_t size_;
+};
+
+template <>
+inline bool MemorySegment::GetValue(int32_t index) const {
+ return Get(index) != 0;
+}
+
+template <>
+inline void MemorySegment::PutValue(int32_t index, const bool& value) {
+ Put(index, static_cast<char>(value ? 1 : 0));
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment_test.cpp
b/src/paimon/common/memory/memory_segment_test.cpp
new file mode 100644
index 0000000..0501295
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_test.cpp
@@ -0,0 +1,760 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment.h"
+
+#include <climits>
+#include <cstdlib>
+#include <limits>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(MemorySegmentTest, TestByteAccess) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+ for (int32_t i = 0; i < page_size; i++) {
+ segment.Put(i, static_cast<char>(std::rand()));
+ }
+ std::srand(seed);
+ for (int32_t i = 0; i < page_size; i++) {
+ ASSERT_EQ(segment.Get(i), static_cast<char>(std::rand()))
+ << "seed: " << seed << ", idx: " << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % page_size;
+ if (occupied[pos]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ }
+ segment.Put(pos, static_cast<char>(std::rand()));
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % page_size;
+
+ if (occupied[pos]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ }
+
+ ASSERT_EQ(segment.Get(pos), static_cast<char>(std::rand()))
+ << "seed: " << seed << ", idx: " << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestBooleanAccess) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % page_size;
+ if (occupied[pos]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ }
+ segment.PutValue<bool>(pos, static_cast<bool>(std::rand() % 2));
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % page_size;
+ if (occupied[pos]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<bool>(pos), static_cast<bool>(std::rand() %
2))
+ << "seed: " << seed << ", idx: " << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestEqualTo) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment seg1 = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+ MemorySegment seg2 = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ Bytes reference_array(page_size, pool.get());
+ seg1.Put(0, reference_array);
+ seg2.Put(0, reference_array);
+
+ int32_t i = paimon::test::RandomNumber(0, (page_size - 8) - 1);
+ seg1.Put(i, static_cast<char>(10));
+ ASSERT_FALSE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i;
+
+ seg1.Put(i, static_cast<char>(0));
+ ASSERT_TRUE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i;
+
+ seg1.Put(i + 8, static_cast<char>(10));
+ ASSERT_FALSE(seg1.EqualTo(seg2, i, i, 9)) << "rand value:" << i;
+}
+
+TEST(MemorySegmentTest, TestCompare) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment seg1 = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+ MemorySegment seg2 = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ Bytes reference_array(page_size, pool.get());
+ seg1.Put(0, reference_array);
+ seg2.Put(0, reference_array);
+
+ int32_t i = paimon::test::RandomNumber(0, (page_size - 8) - 1);
+ seg1.Put(i, static_cast<char>(10));
+ ASSERT_GT(seg1.Compare(seg2, i, i, 9, 9), 0);
+
+ seg1.Put(i, static_cast<char>(0));
+ ASSERT_EQ(seg1.Compare(seg2, i, i, 9, 9), 0);
+
+ seg2.Put(i + 8, static_cast<char>(10));
+ ASSERT_EQ(seg1.Compare(seg2, i, i, 7, 7), 0);
+ ASSERT_LT(seg1.Compare(seg2, i, i, 9, 9), 0);
+
+ // Verify big-endian byte-order comparison semantics within a single
8-byte block.
+ // On little-endian machines, naive native-endian uint64 comparison would
give wrong results.
+ MemorySegment seg3 = MemorySegment::AllocateHeapMemory(16, pool.get());
+ MemorySegment seg4 = MemorySegment::AllocateHeapMemory(16, pool.get());
+ Bytes zeros(16, pool.get());
+ seg3.Put(0, zeros);
+ seg4.Put(0, zeros);
+
+ // seg3: [0x00, 0x01, 0, 0, 0, 0, 0, 0] at offset 0
+ // seg4: [0x01, 0x00, 0, 0, 0, 0, 0, 0] at offset 0
+ // Lexicographic (byte-order) comparison: first byte 0x00 < 0x01, so seg3
< seg4.
+ seg3.Put(1, static_cast<char>(0x01));
+ seg4.Put(0, static_cast<char>(0x01));
+ ASSERT_LT(seg3.Compare(seg4, 0, 0, 8), 0);
+ ASSERT_GT(seg4.Compare(seg3, 0, 0, 8), 0);
+
+ // seg3: [0x01, 0x02, 0, 0, 0, 0, 0, 0]
+ // seg4: [0x01, 0x01, 0, 0, 0, 0, 0, 0]
+ // First bytes equal (0x01 == 0x01), second byte 0x02 > 0x01, so seg3 >
seg4.
+ seg3.Put(0, static_cast<char>(0x01));
+ seg3.Put(1, static_cast<char>(0x02));
+ seg4.Put(0, static_cast<char>(0x01));
+ seg4.Put(1, static_cast<char>(0x01));
+ ASSERT_GT(seg3.Compare(seg4, 0, 0, 8), 0);
+ ASSERT_LT(seg4.Compare(seg3, 0, 0, 8), 0);
+}
+
+TEST(MemorySegmentTest, TestCharAccess) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+
+ for (int32_t i = 0; i <= page_size - 2; i += 2) {
+ segment.PutValue<char16_t>(i, static_cast<char>(std::rand() %
(CHAR_MAX)));
+ }
+
+ std::srand(seed);
+ for (int32_t i = 0; i <= page_size - 2; i += 2) {
+ ASSERT_EQ(segment.GetValue<char16_t>(i), static_cast<char>(std::rand()
% (CHAR_MAX)))
+ << "seed: " << seed << ", idx: " << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 1);
+ if (occupied[pos] || occupied[pos + 1]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ }
+ segment.PutValue<char16_t>(pos, static_cast<char>(std::rand() %
(CHAR_MAX)));
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 1);
+ if (occupied[pos] || occupied[pos + 1]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<char16_t>(pos),
static_cast<char>(std::rand() % (CHAR_MAX)))
+ << "seed: " << seed << ", idx:" << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestShortAccess) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+
+ for (int32_t i = 0; i <= page_size - 2; i += 2) {
+ segment.PutValue<int16_t>(i, static_cast<int16_t>(std::rand()));
+ }
+
+ std::srand(seed);
+ for (int32_t i = 0; i <= page_size - 2; i += 2) {
+ ASSERT_EQ(segment.GetValue<int16_t>(i),
static_cast<int16_t>(std::rand()))
+ << "seed: " << seed << ", idx:" << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 1);
+ if (occupied[pos] || occupied[pos + 1]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ }
+ segment.PutValue<int16_t>(pos, static_cast<int16_t>(std::rand()));
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 1);
+ if (occupied[pos] || occupied[pos + 1]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<int16_t>(pos),
static_cast<int16_t>(std::rand()))
+ << "seed: " << seed << ", idx:" << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestIntAccess) {
+ auto pool = paimon::GetDefaultPool();
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+
+ for (int32_t i = 0; i <= page_size - 4; i += 4) {
+ segment.PutValue<int32_t>(i, static_cast<int32_t>(std::rand()));
+ }
+
+ std::srand(seed);
+ for (int32_t i = 0; i <= page_size - 4; i += 4) {
+ ASSERT_EQ(segment.GetValue<int32_t>(i),
static_cast<int32_t>(std::rand()))
+ << "seed: " << seed << ", idx:" << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 3);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ }
+ segment.PutValue<int32_t>(pos, static_cast<int32_t>(std::rand()));
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 3);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<int32_t>(pos),
static_cast<int32_t>(std::rand()))
+ << "seed: " << seed << ", idx:" << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestLongAccess) {
+ auto pool = paimon::GetDefaultPool();
+ auto lrand = []() -> int64_t {
+ return (static_cast<int64_t>(std::rand()) << (sizeof(int32_t) * 8)) |
std::rand();
+ };
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+
+ for (int32_t i = 0; i <= page_size - 8; i += 8) {
+ segment.PutValue<int64_t>(i, lrand());
+ }
+
+ std::srand(seed);
+ for (int32_t i = 0; i <= page_size - 8; i += 8) {
+ ASSERT_EQ(segment.GetValue<int64_t>(i), lrand()) << "seed: " << seed
<< ", idx:" << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 7);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3] ||
+ occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] ||
occupied[pos + 7]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ occupied[pos + 4] = true;
+ occupied[pos + 5] = true;
+ occupied[pos + 6] = true;
+ occupied[pos + 7] = true;
+ }
+ segment.PutValue<int64_t>(pos, lrand());
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 7);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3] ||
+ occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] ||
occupied[pos + 7]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ occupied[pos + 4] = true;
+ occupied[pos + 5] = true;
+ occupied[pos + 6] = true;
+ occupied[pos + 7] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<int64_t>(pos), lrand()) << "seed: " << seed
<< ", idx:" << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestFloatAccess) {
+ auto pool = paimon::GetDefaultPool();
+ auto frand = []() -> int64_t {
+ return (static_cast<float>(std::rand()) /
static_cast<float>(RAND_MAX));
+ };
+
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+
+ for (int32_t i = 0; i <= page_size - 4; i += 4) {
+ segment.PutValue<float>(i, frand());
+ }
+
+ std::srand(seed);
+ for (int32_t i = 0; i <= page_size - 4; i += 4) {
+ ASSERT_EQ(segment.GetValue<float>(i), frand()) << "seed: " << seed <<
", idx:" << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 3);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ }
+ segment.PutValue<float>(pos, frand());
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 3);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<float>(pos), frand()) << "seed: " << seed
<< ", idx:" << pos;
+ }
+ delete[] occupied;
+}
+
+TEST(MemorySegmentTest, TestDoubleAccess) {
+ auto pool = paimon::GetDefaultPool();
+ auto lrand = []() -> int64_t {
+ return (static_cast<int64_t>(std::rand()) << (sizeof(int32_t) * 8)) |
std::rand();
+ };
+ auto drand = [&]() -> int64_t {
+ return (static_cast<double>(lrand()) /
+ static_cast<double>(std::numeric_limits<int64_t>::max()));
+ };
+
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+
+ for (int32_t i = 0; i <= page_size - 8; i += 8) {
+ segment.PutValue<double>(i, drand());
+ }
+
+ std::srand(seed);
+ for (int32_t i = 0; i <= page_size - 8; i += 8) {
+ ASSERT_EQ(segment.GetValue<double>(i), drand()) << "seed: " << seed <<
", idx:" << i;
+ }
+
+ // test expected correct behavior, random access
+
+ std::srand(seed);
+ bool* occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 7);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3] ||
+ occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] ||
occupied[pos + 7]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ occupied[pos + 4] = true;
+ occupied[pos + 5] = true;
+ occupied[pos + 6] = true;
+ occupied[pos + 7] = true;
+ }
+ segment.PutValue<double>(pos, drand());
+ }
+ delete[] occupied;
+
+ std::srand(seed);
+ occupied = new bool[page_size];
+ std::memset(occupied, 0, page_size * sizeof(bool));
+
+ for (int32_t i = 0; i < 1000; i++) {
+ int32_t pos = std::rand() % (page_size - 7);
+ if (occupied[pos] || occupied[pos + 1] || occupied[pos + 2] ||
occupied[pos + 3] ||
+ occupied[pos + 4] || occupied[pos + 5] || occupied[pos + 6] ||
occupied[pos + 7]) {
+ continue;
+ } else {
+ occupied[pos] = true;
+ occupied[pos + 1] = true;
+ occupied[pos + 2] = true;
+ occupied[pos + 3] = true;
+ occupied[pos + 4] = true;
+ occupied[pos + 5] = true;
+ occupied[pos + 6] = true;
+ occupied[pos + 7] = true;
+ }
+
+ ASSERT_EQ(segment.GetValue<double>(pos), drand()) << "seed: " << seed
<< ", idx:" << pos;
+ }
+ delete[] occupied;
+}
+
+// ------------------------------------------------------------------------
+// Bulk Byte Movements
+// ------------------------------------------------------------------------
+
+TEST(MemorySegmentTest, TestBulkByteAccess) {
+ auto pool = paimon::GetDefaultPool();
+ // test expected correct behavior with default offset / length
+ auto rand_bytes = [&](int32_t size) -> std::shared_ptr<Bytes> {
+ auto bytes = Bytes::AllocateBytes(size, pool.get());
+ for (int32_t i = 0; i < static_cast<int32_t>(bytes->size()); i++) {
+ (*bytes)[i] = static_cast<char>(std::rand());
+ }
+ return bytes;
+ };
+ {
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+ for (int32_t i = 0; i < 8; i++) {
+ auto src = rand_bytes(page_size / 8);
+ segment.Put(i * (page_size / 8), *src);
+ }
+
+ std::srand(seed);
+
+ for (int32_t i = 0; i < 8; i++) {
+ std::shared_ptr<Bytes> expected = rand_bytes(page_size / 8);
+ std::shared_ptr<Bytes> actual = Bytes::AllocateBytes(page_size /
8, pool.get());
+ segment.Get(i * (page_size / 8), actual.get());
+ ASSERT_EQ(expected->size(), actual->size()) << "seed: " << seed <<
", i:" << i;
+ ASSERT_EQ(std::memcmp(expected->data(), actual->data(),
expected->size()), 0)
+ << "seed: " << seed << ", i:" << i;
+ }
+ }
+
+ // test expected correct behavior with specific offset / length
+ {
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+ std::shared_ptr<Bytes> expected = rand_bytes(page_size);
+
+ for (int32_t i = 0; i < 16; i++) {
+ segment.Put(i * (page_size / 16), *expected, i * (page_size / 16),
page_size / 16);
+ }
+ auto actual = Bytes::AllocateBytes(page_size, pool.get());
+ for (int32_t i = 0; i < 16; i++) {
+ segment.Get(i * (page_size / 16), actual.get(), i * (page_size /
16), page_size / 16);
+ }
+ ASSERT_EQ(expected->size(), actual->size()) << "seed: " << seed;
+ ASSERT_EQ(std::memcmp(expected->data(), actual->data(),
expected->size()), 0)
+ << "seed: " << seed;
+ }
+ // put segments of various lengths to various positions
+ {
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+ auto expected = Bytes::AllocateBytes(page_size, pool.get());
+ segment.Put(0, *expected, 0, page_size);
+ for (int32_t i = 0; i < 200; i++) {
+ int32_t num_bytes = std::rand() % (page_size - 10) + 1;
+ int32_t pos = std::rand() % (page_size - num_bytes + 1);
+ std::shared_ptr<Bytes> data = rand_bytes((std::rand() % 3 + 1) *
num_bytes);
+ int32_t data_start_pos = std::rand() % (data->size() - num_bytes +
1);
+
+ // copy to the expected
+ std::memcpy(expected->data() + pos, data->data() + data_start_pos,
num_bytes);
+
+ // put to the memory segment
+ segment.Put(pos, *data, data_start_pos, num_bytes);
+ }
+
+ auto validation = Bytes::AllocateBytes(page_size, pool.get());
+ segment.Get(0, validation.get());
+ ASSERT_EQ(std::memcmp(validation->data(), expected->data(),
expected->size()), 0)
+ << "seed: " << seed;
+ }
+ // get segments with various contents
+ {
+ int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+ std::srand(seed);
+ int32_t page_size = 64 * 1024;
+ MemorySegment segment = MemorySegment::AllocateHeapMemory(page_size,
pool.get());
+ std::shared_ptr<Bytes> contents = rand_bytes(page_size);
+ segment.Put(0, *contents);
+
+ for (int32_t i = 0; i < 200; i++) {
+ int32_t num_bytes = std::rand() % (page_size / 8) + 1;
+ int32_t pos = std::rand() % (page_size - num_bytes + 1);
+ std::shared_ptr<Bytes> data = rand_bytes((std::rand() % 3 + 1) *
num_bytes);
+ int32_t data_start_pos = std::rand() % (data->size() - num_bytes +
1);
+
+ segment.Get(pos, data.get(), data_start_pos, num_bytes);
+ Bytes expected(num_bytes, pool.get());
+ std::memcpy(expected.data(), contents->data() + pos, num_bytes);
+ Bytes validation(num_bytes, pool.get());
+ std::memcpy(validation.data(), data->data() + data_start_pos,
num_bytes);
+ ASSERT_EQ(std::memcmp(validation.data(), expected.data(),
expected.size()), 0)
+ << "seed: " << seed << ", i:" << i;
+ }
+ }
+}
+
+TEST(MemorySegmentTest, TestEqual) {
+ auto pool = paimon::GetDefaultPool();
+ auto seg1 = MemorySegment::Wrap(std::make_shared<Bytes>("abcd",
pool.get()));
+ auto seg2 = MemorySegment::Wrap(std::make_shared<Bytes>("abce",
pool.get()));
+ auto seg3 = MemorySegment::Wrap(std::make_shared<Bytes>("abcd",
pool.get()));
+ ASSERT_EQ(seg1, seg1);
+ ASSERT_EQ(seg1, seg3);
+ ASSERT_FALSE(seg1 == seg2);
+ ASSERT_FALSE(seg2 == seg1);
+}
+
+TEST(MemorySegmentTest, TestNonOwningWrapView) {
+ // Prepare owning data as source
+ auto pool = paimon::GetDefaultPool();
+ std::string source_data = "Hello, WrapView MemorySegment!";
+ auto owning_bytes = std::make_shared<Bytes>(source_data, pool.get());
+ const char* raw_ptr = owning_bytes->data();
+ auto raw_size = static_cast<int32_t>(owning_bytes->size());
+
+ // Create non-owning segment via WrapView
+ auto seg = MemorySegment::WrapView(raw_ptr, raw_size);
+
+ // --- Data() / Size() ---
+ ASSERT_EQ(seg.Data(), raw_ptr);
+ ASSERT_EQ(seg.Size(), raw_size);
+
+ // --- Get(index) ---
+ for (int32_t i = 0; i < raw_size; ++i) {
+ ASSERT_EQ(seg.Get(i), source_data[i]);
+ }
+
+ // --- GetValue<T> ---
+ // Read first 4 bytes as int32
+ int32_t expected_int;
+ std::memcpy(&expected_int, raw_ptr, sizeof(int32_t));
+ ASSERT_EQ(seg.GetValue<int32_t>(0), expected_int);
+
+ // Read first 8 bytes as int64
+ int64_t expected_long;
+ std::memcpy(&expected_long, raw_ptr, sizeof(int64_t));
+ ASSERT_EQ(seg.GetValue<int64_t>(0), expected_long);
+
+ // --- MutableData() + Put(index, char) ---
+ char original_char = seg.Get(0);
+ seg.Put(0, 'X');
+ ASSERT_EQ(seg.Get(0), 'X');
+ seg.Put(0, original_char); // restore
+ ASSERT_EQ(seg.Get(0), original_char);
+
+ // --- Put(index, src, offset, length) ---
+ std::string patch = "AB";
+ seg.Put(0, patch, 0, 2);
+ ASSERT_EQ(seg.Get(0), 'A');
+ ASSERT_EQ(seg.Get(1), 'B');
+
+ // --- PutValue<T> ---
+ int16_t val16 = 0x1234;
+ seg.PutValue<int16_t>(0, val16);
+ ASSERT_EQ(seg.GetValue<int16_t>(0), val16);
+
+ // --- Compare ---
+ auto seg2 = MemorySegment::WrapView(raw_ptr, raw_size);
+ // Both point to same data (we mutated seg's underlying data, seg2 sees it
too)
+ ASSERT_EQ(seg.Compare(seg2, 0, 0, raw_size), 0);
+
+ // --- EqualTo ---
+ ASSERT_TRUE(seg.EqualTo(seg2, 2, 2, raw_size - 2));
+
+ // --- CopyTo owning target ---
+ auto target = MemorySegment::AllocateHeapMemory(raw_size, pool.get());
+ seg.CopyTo(0, &target, 0, raw_size);
+ ASSERT_EQ(seg.Compare(target, 0, 0, raw_size), 0);
+
+ // --- CopyToUnsafe ---
+ std::vector<char> buf(raw_size);
+ seg.CopyToUnsafe(0, buf.data(), 0, raw_size);
+ ASSERT_EQ(std::memcmp(buf.data(), seg.Data(), raw_size), 0);
+
+ // --- GetOrCreateHeapMemory on non-owning: should copy ---
+ auto heap = seg.GetOrCreateHeapMemory(pool.get());
+ ASSERT_NE(heap, nullptr);
+ ASSERT_EQ(static_cast<int32_t>(heap->size()), raw_size);
+ // Returned copy should be independent: modifying seg shouldn't affect heap
+ seg.Put(0, 'Z');
+ ASSERT_NE((*heap)[0], 'Z');
+
+ // --- operator== ---
+ auto seg3 = MemorySegment::WrapView(seg.Data(), seg.Size());
+ ASSERT_EQ(seg, seg3);
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/memory/memory_segment_utils.cpp
b/src/paimon/common/memory/memory_segment_utils.cpp
new file mode 100644
index 0000000..2801854
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_utils.cpp
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment_utils.h"
+
+#include <cassert>
+
+#include "paimon/common/utils/murmurhash_utils.h"
+
+namespace paimon {
+std::shared_ptr<Bytes> MemorySegmentUtils::AllocateBytes(int32_t length,
MemoryPool* pool) {
+ return Bytes::AllocateBytes(length, pool);
+}
+
+void MemorySegmentUtils::CopyFromBytes(std::vector<MemorySegment>* segments,
int32_t offset,
+ const Bytes& bytes, int32_t
bytes_offset,
+ int32_t num_bytes) {
+ if (segments->size() == 1) {
+ (*segments)[0].Put(offset, bytes, bytes_offset, num_bytes);
+ } else {
+ CopyMultiSegmentsFromBytes(segments, offset, bytes, bytes_offset,
num_bytes);
+ }
+}
+
+void
MemorySegmentUtils::CopyMultiSegmentsFromBytes(std::vector<MemorySegment>*
segments,
+ int32_t offset, const
Bytes& bytes,
+ int32_t bytes_offset,
int32_t num_bytes) {
+ int32_t remain_size = num_bytes;
+ for (auto& segment : (*segments)) {
+ int32_t remain = segment.Size() - offset;
+ if (remain > 0) {
+ int32_t n_copy = std::min(remain, remain_size);
+ segment.Put(offset, bytes, num_bytes - remain_size + bytes_offset,
n_copy);
+ remain_size -= n_copy;
+ // next new segment.
+ offset = 0;
+ if (remain_size == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next segment
+ // now the offset = offset - segmentSize (-remain)
+ offset = -remain;
+ }
+ }
+}
+
+PAIMON_UNIQUE_PTR<Bytes> MemorySegmentUtils::CopyToBytes(const
std::vector<MemorySegment>& segments,
+ int32_t offset,
int32_t num_bytes,
+ MemoryPool* pool) {
+ assert(pool);
+ auto bytes = Bytes::AllocateBytes(num_bytes, pool);
+ CopyToBytes(segments, offset, bytes.get(), 0, num_bytes);
+ return bytes;
+}
+
+void MemorySegmentUtils::CopyToUnsafe(const std::vector<MemorySegment>&
segments, int32_t offset,
+ void* target, int32_t num_bytes) {
+ if (InFirstSegment(segments, offset, num_bytes)) {
+ segments[0].CopyToUnsafe(offset, target, 0, num_bytes);
+ } else {
+ CopyMultiSegmentsToUnsafe(segments, offset, target, num_bytes);
+ }
+}
+
+void MemorySegmentUtils::CopyMultiSegmentsToUnsafe(const
std::vector<MemorySegment>& segments,
+ int32_t offset, void*
target,
+ int32_t num_bytes) {
+ int32_t remain_size = num_bytes;
+ for (const auto& segment : segments) {
+ int32_t remain = segment.Size() - offset;
+ if (remain > 0) {
+ int32_t n_copy = std::min(remain, remain_size);
+ segment.CopyToUnsafe(offset, target, num_bytes - remain_size,
n_copy);
+ remain_size -= n_copy;
+ // next new segment.
+ offset = 0;
+ if (remain_size == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next segment
+ // now the offset = offset - segmentSize (-remain)
+ offset = -remain;
+ }
+ }
+}
+
+std::shared_ptr<Bytes> MemorySegmentUtils::GetBytes(const
std::vector<MemorySegment>& segments,
+ int32_t base_offset,
int32_t size_in_bytes,
+ MemoryPool* pool) {
+ // avoid copy if `base` is `byte[]`
+ if (segments.size() == 1) {
+ std::shared_ptr<Bytes> heap_memory =
segments[0].GetOrCreateHeapMemory(pool);
+ if (base_offset == 0 && heap_memory != nullptr &&
+ static_cast<int32_t>(heap_memory->size()) == size_in_bytes) {
+ return heap_memory;
+ } else {
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(size_in_bytes,
pool);
+ segments[0].Get(base_offset, bytes.get(), 0, size_in_bytes);
+ return bytes;
+ }
+ } else {
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes(size_in_bytes,
pool);
+ CopyMultiSegmentsToBytes(segments, base_offset, bytes.get(), 0,
size_in_bytes);
+ return bytes;
+ }
+}
+
+bool MemorySegmentUtils::InFirstSegment(const std::vector<MemorySegment>&
segments, int32_t offset,
+ int32_t num_bytes) {
+ return num_bytes + offset <= segments[0].Size();
+}
+
+int32_t MemorySegmentUtils::ByteIndex(int32_t bit_index) {
+ return (static_cast<uint32_t>(bit_index)) >> ADDRESS_BITS_PER_WORD;
+}
+
+void MemorySegmentUtils::BitUnSet(MemorySegment* segment, int32_t base_offset,
int32_t index) {
+ int32_t offset = base_offset + ByteIndex(index);
+ char current = segment->Get(offset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment->Put(offset, current);
+}
+
+void MemorySegmentUtils::BitSet(MemorySegment* segment, int32_t base_offset,
int32_t index) {
+ int32_t offset = base_offset + ByteIndex(index);
+ char current = segment->Get(offset);
+ current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+ segment->Put(offset, current);
+}
+
+bool MemorySegmentUtils::BitGet(const MemorySegment& segment, int32_t
base_offset, int32_t index) {
+ int32_t offset = base_offset + ByteIndex(index);
+ char current = segment.Get(offset);
+ return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+}
+
+void MemorySegmentUtils::BitSet(std::vector<MemorySegment>* segments, int32_t
base_offset,
+ int32_t index) {
+ if (segments->size() == 1) {
+ int32_t offset = base_offset + ByteIndex(index);
+ MemorySegment& segment = (*segments)[0];
+ char current = segment.Get(offset);
+ current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.Put(offset, current);
+ } else {
+ BitSetMultiSegments(segments, base_offset, index);
+ }
+}
+
+void MemorySegmentUtils::BitSetMultiSegments(std::vector<MemorySegment>*
segments,
+ int32_t base_offset, int32_t
index) {
+ int32_t offset = base_offset + ByteIndex(index);
+ int32_t seg_size = (*segments)[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+ MemorySegment& segment = (*segments)[seg_index];
+
+ char current = segment.Get(seg_offset);
+ current |= (1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.Put(seg_offset, current);
+}
+
+bool MemorySegmentUtils::BitGet(const std::vector<MemorySegment>& segments,
int32_t base_offset,
+ int32_t index) {
+ int32_t offset = base_offset + ByteIndex(index);
+ char current = GetValue<char>(segments, offset);
+ return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0;
+}
+
+void MemorySegmentUtils::BitUnSet(std::vector<MemorySegment>* segments,
int32_t base_offset,
+ int32_t index) {
+ if (segments->size() == 1) {
+ MemorySegment& segment = (*segments)[0];
+ int32_t offset = base_offset + ByteIndex(index);
+ char current = segment.Get(offset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.Put(offset, current);
+ } else {
+ BitUnSetMultiSegments(segments, base_offset, index);
+ }
+}
+
+void MemorySegmentUtils::BitUnSetMultiSegments(std::vector<MemorySegment>*
segments,
+ int32_t base_offset, int32_t
index) {
+ int32_t offset = base_offset + ByteIndex(index);
+ int32_t seg_size = (*segments)[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+ MemorySegment& segment = (*segments)[seg_index];
+
+ char current = segment.Get(seg_offset);
+ current &= ~(1 << (index & BIT_BYTE_INDEX_MASK));
+ segment.Put(seg_offset, current);
+}
+
+bool MemorySegmentUtils::Equals(const std::vector<MemorySegment>& segments1,
int32_t offset1,
+ const std::vector<MemorySegment>& segments2,
int32_t offset2,
+ int32_t len) {
+ if (InFirstSegment(segments1, offset1, len) && InFirstSegment(segments2,
offset2, len)) {
+ return segments1[0].EqualTo(segments2[0], offset1, offset2, len);
+ } else {
+ return EqualsMultiSegments(segments1, offset1, segments2, offset2,
len);
+ }
+}
+
+bool MemorySegmentUtils::EqualsMultiSegments(const std::vector<MemorySegment>&
segments1,
+ int32_t offset1,
+ const std::vector<MemorySegment>&
segments2,
+ int32_t offset2, int32_t len) {
+ if (len == 0) {
+ // quick way and avoid seg_size is zero.
+ return true;
+ }
+
+ int32_t seg_size1 = segments1[0].Size();
+ int32_t seg_size2 = segments2[0].Size();
+
+ // find first seg_index and seg_offset of segments.
+ int32_t seg_index1 = offset1 / seg_size1;
+ int32_t seg_index2 = offset2 / seg_size2;
+ int32_t seg_offset1 = offset1 - seg_size1 * seg_index1; // equal to %
+ int32_t seg_offset2 = offset2 - seg_size2 * seg_index2; // equal to %
+
+ while (len > 0) {
+ int32_t equal_len =
+ std::min(std::min(len, seg_size1 - seg_offset1), seg_size2 -
seg_offset2);
+ if (!segments1[seg_index1].EqualTo(segments2[seg_index2], seg_offset1,
seg_offset2,
+ equal_len)) {
+ return false;
+ }
+ len -= equal_len;
+ seg_offset1 += equal_len;
+ if (seg_offset1 == seg_size1) {
+ seg_offset1 = 0;
+ seg_index1++;
+ }
+ seg_offset2 += equal_len;
+ if (seg_offset2 == seg_size2) {
+ seg_offset2 = 0;
+ seg_index2++;
+ }
+ }
+ return true;
+}
+
+int32_t MemorySegmentUtils::Find(const std::vector<MemorySegment>& segments1,
int32_t offset1,
+ int32_t num_bytes1, const
std::vector<MemorySegment>& segments2,
+ int32_t offset2, int32_t num_bytes2) {
+ if (num_bytes2 == 0) { // quick way 1.
+ return offset1;
+ }
+ if (InFirstSegment(segments1, offset1, num_bytes1) &&
+ InFirstSegment(segments2, offset2, num_bytes2)) {
+ char first = segments2[0].Get(offset2);
+ int32_t end = num_bytes1 - num_bytes2 + offset1;
+ for (int32_t i = offset1; i <= end; i++) {
+ // quick way 2: equal first byte.
+ if (segments1[0].Get(i) == first &&
+ segments1[0].EqualTo(segments2[0], i, offset2, num_bytes2)) {
+ return i;
+ }
+ }
+ return -1;
+ } else {
+ return FindInMultiSegments(segments1, offset1, num_bytes1, segments2,
offset2, num_bytes2);
+ }
+}
+int32_t MemorySegmentUtils::FindInMultiSegments(const
std::vector<MemorySegment>& segments1,
+ int32_t offset1, int32_t
num_bytes1,
+ const
std::vector<MemorySegment>& segments2,
+ int32_t offset2, int32_t
num_bytes2) {
+ int32_t end = num_bytes1 - num_bytes2 + offset1;
+ for (int32_t i = offset1; i <= end; i++) {
+ if (EqualsMultiSegments(segments1, i, segments2, offset2, num_bytes2))
{
+ return i;
+ }
+ }
+ return -1;
+}
+
+int32_t MemorySegmentUtils::Hash(const std::vector<MemorySegment>& segments,
int32_t offset,
+ int32_t num_bytes, MemoryPool* pool) {
+ if (InFirstSegment(segments, offset, num_bytes)) {
+ return MurmurHashUtils::HashBytes(segments[0], offset, num_bytes);
+ } else {
+ return HashMultiSeg(segments, offset, num_bytes, pool);
+ }
+}
+
+int32_t MemorySegmentUtils::HashByWords(const std::vector<MemorySegment>&
segments, int32_t offset,
+ int32_t num_bytes, MemoryPool* pool) {
+ if (InFirstSegment(segments, offset, num_bytes)) {
+ return MurmurHashUtils::HashBytesByWords(segments[0], offset,
num_bytes);
+ } else {
+ return HashMultiSegByWords(segments, offset, num_bytes, pool);
+ }
+}
+
+int32_t MemorySegmentUtils::HashMultiSegByWords(const
std::vector<MemorySegment>& segments,
+ int32_t offset, int32_t
num_bytes,
+ MemoryPool* pool) {
+ std::shared_ptr<Bytes> bytes = AllocateBytes(num_bytes, pool);
+ CopyMultiSegmentsToBytes(segments, offset, bytes.get(), 0, num_bytes);
+ return
MurmurHashUtils::HashUnsafeBytesByWords(reinterpret_cast<void*>(bytes->data()),
0,
+ num_bytes);
+}
+
+int32_t MemorySegmentUtils::HashMultiSeg(const std::vector<MemorySegment>&
segments, int32_t offset,
+ int32_t num_bytes, MemoryPool* pool) {
+ std::shared_ptr<Bytes> bytes = AllocateBytes(num_bytes, pool);
+ CopyMultiSegmentsToBytes(segments, offset, bytes.get(), 0, num_bytes);
+
+ return
MurmurHashUtils::HashUnsafeBytes(reinterpret_cast<void*>(bytes->data()), 0,
num_bytes);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment_utils.h
b/src/paimon/common/memory/memory_segment_utils.h
new file mode 100644
index 0000000..6208ff5
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_utils.h
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment.h"
+#include "paimon/io/byte_order.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/type_fwd.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class PAIMON_EXPORT MemorySegmentUtils {
+ public:
+ MemorySegmentUtils() = delete;
+ ~MemorySegmentUtils() = delete;
+
+ /// Allocate bytes in pool
+ static std::shared_ptr<Bytes> AllocateBytes(int32_t length, MemoryPool*
pool);
+
+ /// Copy target segments from source byte[].
+ ///
+ /// @param segments target segments.
+ /// @param offset target segments offset.
+ /// @param bytes source byte[].
+ /// @param bytes_offset source byte[] offset.
+ /// @param num_bytes the number bytes to copy.
+ static void CopyFromBytes(std::vector<MemorySegment>* segments, int32_t
offset,
+ const Bytes& bytes, int32_t bytes_offset,
int32_t num_bytes);
+
+ /// Copy segments to a new byte[].
+ ///
+ /// @param segments Source segments.
+ /// @param offset Source segments offset.
+ /// @param num_bytes the number bytes to copy.
+ static PAIMON_UNIQUE_PTR<Bytes> CopyToBytes(const
std::vector<MemorySegment>& segments,
+ int32_t offset, int32_t
num_bytes,
+ MemoryPool* pool);
+
+ /// Copy segments to target byte[].
+ ///
+ /// @param segments Source segments.
+ /// @param offset Source segments offset.
+ /// @param bytes target byte[].
+ /// @param bytes_offset target byte[] offset.
+ /// @param num_bytes the number bytes to copy.
+ template <typename T>
+ static void CopyToBytes(const std::vector<MemorySegment>& segments,
int32_t offset, T* bytes,
+ int32_t bytes_offset, int32_t num_bytes);
+
+ /// Copy bytes of segments to output stream.
+ ///
+ /// @note It just copies the data in, not include the length.
+ ///
+ /// @param segments source segments
+ /// @param offset offset for segments
+ /// @param size_in_bytes size in bytes
+ /// @param target target output stream
+ static Status CopyToStream(const std::vector<MemorySegment>& segments,
int32_t offset,
+ int32_t size_in_bytes,
MemorySegmentOutputStream* target);
+
+ /// Copy segments to target unsafe pointer.
+ ///
+ /// @param segments Source segments.
+ /// @param offset The position where the bytes are started to be read from
these memory
+ /// segments.
+ /// @param target The unsafe memory to copy the bytes to.
+ /// @param num_bytes the number bytes to copy.
+ static void CopyToUnsafe(const std::vector<MemorySegment>& segments,
int32_t offset,
+ void* target, int32_t num_bytes);
+
+ template <typename T>
+ static void CopyMultiSegmentsToBytes(const std::vector<MemorySegment>&
segments, int32_t offset,
+ T* bytes, int32_t bytes_offset,
int32_t num_bytes);
+
+ static std::shared_ptr<Bytes> GetBytes(const std::vector<MemorySegment>&
segments,
+ int32_t base_offset, int32_t
size_in_bytes,
+ MemoryPool* pool);
+
+ /// Is it just in first MemorySegment, we use quick way to do something.
+ static bool InFirstSegment(const std::vector<MemorySegment>& segments,
int32_t offset,
+ int32_t num_bytes);
+
+ /// unset bit.
+ ///
+ /// @param segment target segment.
+ /// @param base_offset bits base offset.
+ /// @param index bit index from base offset.
+ static void BitUnSet(MemorySegment* segment, int32_t base_offset, int32_t
index);
+
+ /// set bit.
+ ///
+ /// @param segment target segment.
+ /// @param base_offset bits base offset.
+ /// @param index bit index from base offset.
+ static void BitSet(MemorySegment* segment, int32_t base_offset, int32_t
index);
+
+ /// read bit.
+ ///
+ /// @param segment target segment.
+ /// @param base_offset bits base offset.
+ /// @param index bit index from base offset.
+ static bool BitGet(const MemorySegment& segment, int32_t base_offset,
int32_t index);
+
+ /// set bit from segments.
+ ///
+ /// @param segments target segments.
+ /// @param base_offset bits base offset.
+ /// @param index bit index from base offset.
+ static void BitSet(std::vector<MemorySegment>* segments, int32_t
base_offset, int32_t index);
+
+ /// read bit from segments.
+ ///
+ /// @param segments target segments.
+ /// @param base_offset bits base offset.
+ /// @param index bit index from base offset.
+ static bool BitGet(const std::vector<MemorySegment>& segments, int32_t
base_offset,
+ int32_t index);
+
+ /// unset bit from segments.
+ ///
+ /// @param segments target segments.
+ /// @param base_offset bits base offset.
+ /// @param index bit index from base offset.
+ static void BitUnSet(std::vector<MemorySegment>* segments, int32_t
base_offset, int32_t index);
+
+ /// get value from segments. Only support: bool, char, int16_t, int32_t,
int64_t, double, float
+ ///
+ /// @param segments target segments.
+ /// @param offset value offset.
+ template <typename T>
+ static T GetValue(const std::vector<MemorySegment>& segments, int32_t
offset);
+
+ /// set value to segments. Only support: bool, char, int16_t, int32_t,
int64_t, double, float
+ ///
+ /// @param segments target segments.
+ /// @param offset value offset.
+ template <typename T>
+ static void SetValue(std::vector<MemorySegment>* segments, int32_t offset,
const T& value);
+
+ /*
+ * Equals two memory segments regions.
+ *
+ * @param segments1 Segments 1
+ * @param offset1 Offset of segments1 to start equaling
+ * @param segments2 Segments 2
+ * @param offset2 Offset of segments2 to start equaling
+ * @param len Length of the equaled memory region
+ * @return true if equal, false otherwise
+ */
+ static bool Equals(const std::vector<MemorySegment>& segments1, int32_t
offset1,
+ const std::vector<MemorySegment>& segments2, int32_t
offset2, int32_t len);
+
+ static bool EqualsMultiSegments(const std::vector<MemorySegment>&
segments1, int32_t offset1,
+ const std::vector<MemorySegment>&
segments2, int32_t offset2,
+ int32_t len);
+
+ /// Find equal segments2 in segments1.
+ ///
+ /// @param segments1 segs to find.
+ /// @param segments2 sub segs.
+ /// @return Return the found offset, return -1 if not find.
+ static int32_t Find(const std::vector<MemorySegment>& segments1, int32_t
offset1,
+ int32_t num_bytes1, const std::vector<MemorySegment>&
segments2,
+ int32_t offset2, int32_t num_bytes2);
+
+ /// hash segments to int, num_bytes must be aligned to 4 bytes.
+ ///
+ /// @param segments Source segments.
+ /// @param offset Source segments offset.
+ /// @param num_bytes the number bytes to hash.
+ static int32_t HashByWords(const std::vector<MemorySegment>& segments,
int32_t offset,
+ int32_t num_bytes, MemoryPool* pool);
+
+ /// hash segments to int.
+ ///
+ /// @param segments Source segments.
+ /// @param offset Source segments offset.
+ /// @param num_bytes the number bytes to hash.
+ static int32_t Hash(const std::vector<MemorySegment>& segments, int32_t
offset,
+ int32_t num_bytes, MemoryPool* pool);
+
+ private:
+ static constexpr int32_t ADDRESS_BITS_PER_WORD = 3;
+ static constexpr int32_t BIT_BYTE_INDEX_MASK = 7;
+ static constexpr int32_t MAX_BYTES_LENGTH = 1024 * 64;
+ static constexpr int32_t MAX_CHARS_LENGTH = 1024 * 32;
+
+ private:
+ template <typename T>
+ static void SetValueToMultiSegments(std::vector<MemorySegment>* segments,
int32_t offset,
+ const T& value);
+
+ template <typename T>
+ static void SetValueSlowly(std::vector<MemorySegment>* segments, int32_t
seg_size,
+ int32_t seg_num, int32_t seg_offset, const T&
value);
+ template <typename T>
+ static T GetValueFromMultiSegments(const std::vector<MemorySegment>&
segments, int32_t offset);
+
+ template <typename T>
+ static T GetValueSlowly(const std::vector<MemorySegment>& segments,
int32_t seg_size,
+ int32_t seg_num, int32_t seg_offset);
+
+ static void CopyMultiSegmentsFromBytes(std::vector<MemorySegment>*
segments, int32_t offset,
+ const Bytes& bytes, int32_t
bytes_offset,
+ int32_t num_bytes);
+
+ static void CopyMultiSegmentsToUnsafe(const std::vector<MemorySegment>&
segments,
+ int32_t offset, void* target,
int32_t num_bytes);
+
+ static int32_t FindInMultiSegments(const std::vector<MemorySegment>&
segments1, int32_t offset1,
+ int32_t num_bytes1,
+ const std::vector<MemorySegment>&
segments2, int32_t offset2,
+ int32_t num_bytes2);
+
+ static int32_t HashMultiSeg(const std::vector<MemorySegment>& segments,
int32_t offset,
+ int32_t num_bytes, MemoryPool* pool);
+
+ static int32_t HashMultiSegByWords(const std::vector<MemorySegment>&
segments, int32_t offset,
+ int32_t num_bytes, MemoryPool* pool);
+
+ /// Given a bit index, return the byte index containing it.
+ ///
+ /// @param bit_index the bit index.
+ /// @return the byte index.
+ static int32_t ByteIndex(int32_t bit_index);
+
+ static void BitSetMultiSegments(std::vector<MemorySegment>* segments,
int32_t base_offset,
+ int32_t index);
+
+ static void BitUnSetMultiSegments(std::vector<MemorySegment>* segments,
int32_t base_offset,
+ int32_t index);
+};
+
+template <typename T>
+inline void MemorySegmentUtils::CopyToBytes(const std::vector<MemorySegment>&
segments,
+ int32_t offset, T* bytes, int32_t
bytes_offset,
+ int32_t num_bytes) {
+ if (InFirstSegment(segments, offset, num_bytes)) {
+ segments[0].Get(offset, bytes, bytes_offset, num_bytes);
+ } else {
+ CopyMultiSegmentsToBytes(segments, offset, bytes, bytes_offset,
num_bytes);
+ }
+}
+
+template <typename T>
+inline void MemorySegmentUtils::CopyMultiSegmentsToBytes(const
std::vector<MemorySegment>& segments,
+ int32_t offset, T*
bytes,
+ int32_t bytes_offset,
int32_t num_bytes) {
+ int32_t remain_size = num_bytes;
+ for (const auto& segment : segments) {
+ int32_t remain = segment.Size() - offset;
+ if (remain > 0) {
+ int32_t n_copy = std::min(remain, remain_size);
+ segment.Get(offset, bytes, num_bytes - remain_size + bytes_offset,
n_copy);
+ remain_size -= n_copy;
+ // next new segment.
+ offset = 0;
+ if (remain_size == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next segment
+ // now the offset = offset - segment_size (-remain)
+ offset = -remain;
+ }
+ }
+}
+
+template <typename T>
+inline T MemorySegmentUtils::GetValue(const std::vector<MemorySegment>&
segments, int32_t offset) {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+ if (InFirstSegment(segments, offset, sizeof(T))) {
+ return segments[0].GetValue<T>(offset);
+ } else {
+ return GetValueFromMultiSegments<T>(segments, offset);
+ }
+}
+
+template <typename T>
+inline void MemorySegmentUtils::SetValue(std::vector<MemorySegment>* segments,
int32_t offset,
+ const T& value) {
+ static_assert(std::is_trivially_copyable_v<T>, "T must be trivially
copyable");
+ if (InFirstSegment(*segments, offset, sizeof(T))) {
+ (*segments)[0].PutValue<T>(offset, value);
+ } else {
+ SetValueToMultiSegments<T>(segments, offset, value);
+ }
+}
+
+template <typename T>
+inline void
MemorySegmentUtils::SetValueToMultiSegments(std::vector<MemorySegment>*
segments,
+ int32_t offset, const
T& value) {
+ int32_t seg_size = (*segments)[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+
+ if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(T))) {
+ (*segments)[seg_index].PutValue<T>(seg_offset, value);
+ } else {
+ SetValueSlowly<T>(segments, seg_size, seg_index, seg_offset, value);
+ }
+}
+
+template <>
+inline void
MemorySegmentUtils::SetValueToMultiSegments(std::vector<MemorySegment>*
segments,
+ int32_t offset, const
float& value) {
+ int32_t seg_size = (*segments)[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+
+ if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(float))) {
+ (*segments)[seg_index].PutValue<float>(seg_offset, value);
+ } else {
+ int32_t int_value;
+ std::memcpy(&int_value, &value, sizeof(float));
+ SetValueSlowly<int32_t>(segments, seg_size, seg_index, seg_offset,
int_value);
+ }
+}
+
+template <>
+inline void
MemorySegmentUtils::SetValueToMultiSegments(std::vector<MemorySegment>*
segments,
+ int32_t offset, const
double& value) {
+ int32_t seg_size = (*segments)[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+
+ if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(double))) {
+ (*segments)[seg_index].PutValue<double>(seg_offset, value);
+ } else {
+ int64_t long_value;
+ std::memcpy(&long_value, &value, sizeof(double));
+ SetValueSlowly<int64_t>(segments, seg_size, seg_index, seg_offset,
long_value);
+ }
+}
+
+template <typename T>
+inline void MemorySegmentUtils::SetValueSlowly(std::vector<MemorySegment>*
segments,
+ int32_t seg_size, int32_t
seg_num,
+ int32_t seg_offset, const T&
value) {
+ MemorySegment segment = (*segments)[seg_num];
+ for (size_t i = 0; i < sizeof(T); i++) {
+ if (seg_offset == seg_size) {
+ segment = (*segments)[++seg_num];
+ seg_offset = 0;
+ }
+ T unsigned_byte;
+ if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+ unsigned_byte = value >> (i * 8);
+ } else {
+ int32_t shift_count = sizeof(T) - 1;
+ unsigned_byte = value >> ((shift_count - i) * 8);
+ }
+ segment.Put(seg_offset, static_cast<char>(unsigned_byte));
+ seg_offset++;
+ }
+}
+
+template <typename T>
+inline T MemorySegmentUtils::GetValueFromMultiSegments(const
std::vector<MemorySegment>& segments,
+ int32_t offset) {
+ int32_t seg_size = segments[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+ if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(T))) {
+ return segments[seg_index].GetValue<T>(seg_offset);
+ } else {
+ return GetValueSlowly<T>(segments, seg_size, seg_index, seg_offset);
+ }
+}
+
+template <>
+inline float MemorySegmentUtils::GetValueFromMultiSegments(
+ const std::vector<MemorySegment>& segments, int32_t offset) {
+ int32_t seg_size = segments[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+ if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(float))) {
+ return segments[seg_index].GetValue<float>(seg_offset);
+ } else {
+ auto int_value = GetValueSlowly<int32_t>(segments, seg_size,
seg_index, seg_offset);
+ float float_value;
+ std::memcpy(&float_value, &int_value, sizeof(float));
+ return float_value;
+ }
+}
+
+template <>
+inline double MemorySegmentUtils::GetValueFromMultiSegments(
+ const std::vector<MemorySegment>& segments, int32_t offset) {
+ int32_t seg_size = segments[0].Size();
+ int32_t seg_index = offset / seg_size;
+ int32_t seg_offset = offset - seg_index * seg_size; // equal to %
+ if (seg_offset <= seg_size - static_cast<int32_t>(sizeof(double))) {
+ return segments[seg_index].GetValue<double>(seg_offset);
+ } else {
+ auto long_value = GetValueSlowly<int64_t>(segments, seg_size,
seg_index, seg_offset);
+ double double_value;
+ std::memcpy(&double_value, &long_value, sizeof(double));
+ return double_value;
+ }
+}
+
+template <typename T>
+inline T MemorySegmentUtils::GetValueSlowly(const std::vector<MemorySegment>&
segments,
+ int32_t seg_size, int32_t seg_num,
int32_t seg_offset) {
+ MemorySegment segment = segments[seg_num];
+ T ret = 0;
+ for (size_t i = 0; i < sizeof(T); i++) {
+ if (seg_offset == seg_size) {
+ segment = segments[++seg_num];
+ seg_offset = 0;
+ }
+ T unsigned_byte = segment.Get(seg_offset) & 0xff;
+ if (SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN) {
+ ret |= (unsigned_byte << (i * 8));
+ } else {
+ int32_t shift_count = sizeof(T) - 1;
+ ret |= (unsigned_byte << ((shift_count - i) * 8));
+ }
+ seg_offset++;
+ }
+ return ret;
+}
+
+inline Status MemorySegmentUtils::CopyToStream(const
std::vector<MemorySegment>& segments,
+ int32_t offset, int32_t
size_in_bytes,
+ MemorySegmentOutputStream*
target) {
+ for (const auto& source_segment : segments) {
+ int32_t cur_seg_remain = source_segment.Size() - offset;
+ if (cur_seg_remain > 0) {
+ int32_t copy_size = std::min(cur_seg_remain, size_in_bytes);
+ target->Write(source_segment, offset, copy_size);
+ size_in_bytes -= copy_size;
+ offset = 0;
+ } else {
+ offset -= source_segment.Size();
+ }
+
+ if (size_in_bytes == 0) {
+ return Status::OK();
+ }
+ }
+ if (size_in_bytes != 0) {
+ return Status::Invalid(
+ fmt::format("No copy finished, this should be a bug, "
+ "The remaining length is: {}",
+ size_in_bytes));
+ }
+ return Status::OK();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/memory/memory_segment_utils_test.cpp
b/src/paimon/common/memory/memory_segment_utils_test.cpp
new file mode 100644
index 0000000..e4320b2
--- /dev/null
+++ b/src/paimon/common/memory/memory_segment_utils_test.cpp
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/memory/memory_segment_utils.h"
+
+#include <cstdlib>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(MemorySegmentUtilsTest, TestSetAndGetValue) {
+ auto pool = GetDefaultPool();
+ for (auto single_segment_size : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128}) {
+ std::vector<MemorySegment> segments;
+ segments.reserve(50);
+ for (size_t i = 0; i < 50; i++) {
+ segments.push_back(
+ MemorySegment::Wrap(Bytes::AllocateBytes(single_segment_size,
pool.get())));
+ }
+ int32_t offset = 0;
+ MemorySegmentUtils::SetValue<int32_t>(&segments, offset, 233);
+ ASSERT_EQ(233, MemorySegmentUtils::GetValue<int32_t>(segments,
offset));
+
+ offset += sizeof(int32_t);
+ MemorySegmentUtils::SetValue<int64_t>(&segments, offset, 23333333333);
+ ASSERT_EQ(23333333333, MemorySegmentUtils::GetValue<int64_t>(segments,
offset));
+
+ offset += sizeof(int64_t);
+ MemorySegmentUtils::SetValue<float>(&segments, offset, 233.3);
+ ASSERT_NEAR(233.3, MemorySegmentUtils::GetValue<float>(segments,
offset), 0.001);
+
+ offset += sizeof(float);
+ MemorySegmentUtils::SetValue<double>(&segments, offset, 244.3);
+ ASSERT_NEAR(244.3, MemorySegmentUtils::GetValue<double>(segments,
offset), 0.001);
+
+ offset += sizeof(double);
+ MemorySegmentUtils::SetValue<int16_t>(&segments, offset, 5564);
+ ASSERT_EQ(5564, MemorySegmentUtils::GetValue<int16_t>(segments,
offset));
+
+ offset += sizeof(int16_t);
+ MemorySegmentUtils::SetValue<char>(&segments, offset, 123);
+ ASSERT_EQ(123, MemorySegmentUtils::GetValue<char>(segments, offset));
+
+ offset += sizeof(char);
+ MemorySegmentUtils::SetValue<bool>(&segments, offset, true);
+ ASSERT_EQ(true, MemorySegmentUtils::GetValue<bool>(segments, offset));
+
+ ASSERT_EQ(233, MemorySegmentUtils::GetValue<int32_t>(segments, 0));
+ }
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyFromBytesAndGetBytes) {
+ auto pool = GetDefaultPool();
+ int32_t str_size = 1024;
+ std::string test_string1, test_string2;
+ test_string1.reserve(str_size);
+ test_string2.reserve(str_size);
+ for (int32_t j = 0; j < str_size; j++) {
+ test_string1 += static_cast<char>(paimon::test::RandomNumber(0, 25) +
'a');
+ test_string2 += static_cast<char>(paimon::test::RandomNumber(0, 25) +
'a');
+ }
+ test_string2 += test_string1;
+ std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes(test_string1,
pool.get());
+ std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes(test_string2,
pool.get());
+
+ std::vector<MemorySegment> segs =
{MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+ MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes1, 0,
test_string1.size());
+ PAIMON_UNIQUE_PTR<Bytes> bytes_serialize =
+ MemorySegmentUtils::CopyToBytes(segs, 0, test_string1.size(),
pool.get());
+ auto bytes_get = MemorySegmentUtils::GetBytes(segs, /*base_offset=*/0,
+ /*size_in_bytes=*/str_size,
pool.get());
+ ASSERT_EQ(*bytes_get, *bytes1);
+ std::string str_serialize = std::string(bytes_serialize->data(),
bytes_serialize->size());
+ ASSERT_EQ(test_string1, str_serialize);
+
+ // test MultiSegments
+ std::vector<MemorySegment> segs2 =
{MemorySegment::AllocateHeapMemory(str_size, pool.get()),
+
MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+ MemorySegmentUtils::CopyFromBytes(&segs2, 0, *bytes2, 0,
test_string2.size());
+ PAIMON_UNIQUE_PTR<Bytes> bytes_serialize2 =
+ MemorySegmentUtils::CopyToBytes(segs2, 0, test_string2.size(),
pool.get());
+ std::string str_serialize2 = std::string(bytes_serialize2->data(),
bytes_serialize2->size());
+ ASSERT_EQ(test_string2, str_serialize2);
+
+ std::shared_ptr<Bytes> bytes_serialize3 = MemorySegmentUtils::GetBytes(
+ segs2, test_string2.size() - test_string1.size(), test_string1.size(),
pool.get());
+ std::string str_serialize3 = std::string(bytes_serialize3->data(),
bytes_serialize3->size());
+ ASSERT_EQ(test_string1, str_serialize3);
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyToUnsafe) {
+ auto pool = GetDefaultPool();
+ int32_t str_size = 1024;
+ std::string test_string1, test_string2;
+ test_string1.reserve(str_size);
+ test_string2.reserve(str_size);
+ for (int32_t j = 0; j < str_size; j++) {
+ test_string1 += static_cast<char>(paimon::test::RandomNumber(0, 25) +
'a');
+ test_string2 += static_cast<char>(paimon::test::RandomNumber(0, 25) +
'a');
+ }
+ test_string2 += test_string1;
+ std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes(test_string1,
pool.get());
+ std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes(test_string2,
pool.get());
+
+ std::vector<MemorySegment> segs =
{MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+ MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes1, 0,
test_string1.size());
+ std::shared_ptr<Bytes> bytes_serialize =
Bytes::AllocateBytes(test_string1.size(), pool.get());
+ MemorySegmentUtils::CopyToUnsafe(segs, 0, bytes_serialize->data(),
test_string1.size());
+ ASSERT_EQ(test_string1, std::string(bytes_serialize->data(),
bytes_serialize->size()));
+
+ // test MultiSegments
+ std::vector<MemorySegment> segs2 =
{MemorySegment::AllocateHeapMemory(str_size, pool.get()),
+
MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+ MemorySegmentUtils::CopyFromBytes(&segs2, 0, *bytes2, 0,
test_string2.size());
+ std::shared_ptr<Bytes> bytes_serialize2 =
Bytes::AllocateBytes(test_string1.size(), pool.get());
+ MemorySegmentUtils::CopyToUnsafe(segs2, test_string2.size() -
test_string1.size(),
+ bytes_serialize2->data(),
test_string1.size());
+ ASSERT_EQ(test_string1, std::string(bytes_serialize2->data(),
bytes_serialize2->size()));
+}
+
+TEST(MemorySegmentUtilsTest, TestSetAndUnSet) {
+ auto pool = GetDefaultPool();
+ int32_t str_size = 1024;
+ std::string test_string1, test_string2;
+ test_string1.reserve(str_size);
+ test_string2.reserve(str_size);
+ for (int32_t j = 0; j < str_size; j++) {
+ test_string1 += static_cast<char>(paimon::test::RandomNumber(0, 25) +
'a');
+ test_string2 += static_cast<char>(paimon::test::RandomNumber(0, 25) +
'a');
+ }
+ test_string2 += test_string1;
+ std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes(test_string2,
pool.get());
+
+ std::vector<MemorySegment> segs =
{MemorySegment::AllocateHeapMemory(str_size, pool.get()),
+
MemorySegment::AllocateHeapMemory(str_size, pool.get())};
+ MemorySegmentUtils::CopyFromBytes(&segs, 0, *bytes2, 0,
test_string2.size());
+ int32_t index = paimon::test::RandomNumber(0, str_size - 1);
+ MemorySegmentUtils::BitUnSet(&segs, str_size, index);
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, str_size, index));
+ MemorySegmentUtils::BitSet(&segs, str_size, index);
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, str_size, index));
+ MemorySegmentUtils::BitSet(&segs[0], /*base_offset=*/0, index);
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs[0], /*base_offset=*/0, index));
+ MemorySegmentUtils::BitUnSet(&segs[0], /*base_offset=*/0, index);
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs[0], /*base_offset=*/0,
index));
+}
+
+TEST(MemorySegmentUtilsTest, TestBitSetAndUnSetSingleSegment) {
+ auto pool = GetDefaultPool();
+ int32_t segment_size = 128;
+ std::vector<MemorySegment> segs =
{MemorySegment::AllocateHeapMemory(segment_size, pool.get())};
+
+ // initially all bits should be 0 after allocation
+ for (int32_t i = 0; i < 16; i++) {
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, i));
+ }
+
+ // set bits at various indices and verify
+ MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/0);
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 0));
+
+ MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/7);
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 7));
+
+ MemorySegmentUtils::BitSet(&segs, /*base_offset=*/0, /*index=*/15);
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15));
+
+ // unset and verify
+ MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/0);
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 0));
+
+ MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/7);
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 7));
+
+ // bit 15 should still be set
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15));
+
+ MemorySegmentUtils::BitUnSet(&segs, /*base_offset=*/0, /*index=*/15);
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, /*base_offset=*/0, 15));
+
+ // test with non-zero base_offset
+ int32_t base_offset = 10;
+ MemorySegmentUtils::BitSet(&segs, base_offset, /*index=*/3);
+ ASSERT_TRUE(MemorySegmentUtils::BitGet(segs, base_offset, 3));
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, base_offset, 2));
+
+ MemorySegmentUtils::BitUnSet(&segs, base_offset, /*index=*/3);
+ ASSERT_FALSE(MemorySegmentUtils::BitGet(segs, base_offset, 3));
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyMultiSegmentsFromBytes) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("abcdef", pool.get());
+ int32_t segment_size = 10;
+ std::vector<MemorySegment> segs =
{MemorySegment::AllocateHeapMemory(segment_size, pool.get()),
+
MemorySegment::AllocateHeapMemory(segment_size, pool.get())};
+ {
+ MemorySegmentUtils::CopyMultiSegmentsFromBytes(&segs, /*offset=*/3,
*bytes,
+ /*bytes_offset=*/0,
+
/*num_bytes=*/bytes->size());
+ auto result_bytes =
+ MemorySegmentUtils::CopyToBytes(segs, /*offset=*/3,
+ /*num_bytes=*/bytes->size(),
pool.get());
+ ASSERT_EQ(*bytes, *result_bytes);
+ }
+ {
+ MemorySegmentUtils::CopyMultiSegmentsFromBytes(&segs, /*offset=*/12,
*bytes,
+ /*bytes_offset=*/0,
+
/*num_bytes=*/bytes->size());
+ auto result_bytes =
+ MemorySegmentUtils::CopyToBytes(segs, /*offset=*/12,
+ /*num_bytes=*/bytes->size(),
pool.get());
+ ASSERT_EQ(*bytes, *result_bytes);
+ }
+}
+
+TEST(MemorySegmentUtilsTest, TestCopyToStream) {
+ auto pool = GetDefaultPool();
+ int32_t segment_size = 3;
+ std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes("abc", pool.get());
+ std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes("def", pool.get());
+ std::vector<MemorySegment> segs = {MemorySegment::Wrap(bytes1),
MemorySegment::Wrap(bytes2)};
+ {
+ MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool);
+ ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/0,
+
/*size_in_bytes=*/segment_size * 2, &out));
+ std::shared_ptr<Bytes> expected_bytes = Bytes::AllocateBytes("abcdef",
pool.get());
+ auto bytes =
+ MemorySegmentUtils::CopyToBytes(out.Segments(), 0,
out.CurrentSize(), pool.get());
+ ASSERT_EQ(*expected_bytes, *bytes);
+ }
+ {
+ MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool);
+ ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/2,
/*size_in_bytes=*/4, &out));
+ std::shared_ptr<Bytes> expected_bytes = Bytes::AllocateBytes("cdef",
pool.get());
+ auto bytes =
+ MemorySegmentUtils::CopyToBytes(out.Segments(), 0,
out.CurrentSize(), pool.get());
+ ASSERT_EQ(*expected_bytes, *bytes);
+ }
+ {
+ MemorySegmentOutputStream out(/*segment_size=*/segment_size, pool);
+ ASSERT_OK(MemorySegmentUtils::CopyToStream(segs, /*offset=*/4,
/*size_in_bytes=*/2, &out));
+ std::shared_ptr<Bytes> expected_bytes = Bytes::AllocateBytes("ef",
pool.get());
+ auto bytes =
+ MemorySegmentUtils::CopyToBytes(out.Segments(), 0,
out.CurrentSize(), pool.get());
+ ASSERT_EQ(*expected_bytes, *bytes);
+ }
+}
+
+TEST(MemorySegmentUtilsTest, TestFind) {
+ auto pool = GetDefaultPool();
+ std::shared_ptr<Bytes> bytes1 = Bytes::AllocateBytes("abc", pool.get());
+ std::shared_ptr<Bytes> bytes2 = Bytes::AllocateBytes("def", pool.get());
+ std::shared_ptr<Bytes> bytes3 = Bytes::AllocateBytes("adef", pool.get());
+ std::shared_ptr<Bytes> bytes4 = Bytes::AllocateBytes("ghi", pool.get());
+ std::vector<MemorySegment> segs1 = {MemorySegment::Wrap(bytes1),
+ MemorySegment::Wrap(bytes2)}; //
abcdef
+ std::vector<MemorySegment> segs2 = {MemorySegment::Wrap(bytes3),
+ MemorySegment::Wrap(bytes4)}; //
adefghi
+ // find ""
+ ASSERT_EQ(1, MemorySegmentUtils::Find(segs1, /*offset1=*/1,
/*num_bytes1=*/5, segs2,
+ /*offset2=*/0, /*num_bytes2=*/0));
+ // find "def"
+ ASSERT_EQ(3, MemorySegmentUtils::Find(segs1, /*offset1=*/0,
/*num_bytes1=*/6, segs2,
+ /*offset2=*/1, /*num_bytes2=*/3));
+ // find "defg"
+ ASSERT_EQ(-1, MemorySegmentUtils::Find(segs1, /*offset1=*/0,
/*num_bytes1=*/6, segs2,
+ /*offset2=*/1, /*num_bytes2=*/4));
+ // find "de" in "abc" of segs1
+ ASSERT_EQ(-1, MemorySegmentUtils::Find(segs1, /*offset1=*/0,
/*num_bytes1=*/3, segs2,
+ /*offset2=*/1, /*num_bytes2=*/2));
+ // find "a" in "abc" of segs1
+ ASSERT_EQ(0, MemorySegmentUtils::Find(segs1, /*offset1=*/0,
/*num_bytes1=*/3, segs2,
+ /*offset2=*/0, /*num_bytes2=*/1));
+}
+
+} // namespace paimon::test