This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e75d84fc1 [CELEBORN-1772][CIP-14] Add memory module to cppClient
e75d84fc1 is described below
commit e75d84fc195cfdc9a6ef7004edda88db903b5fc1
Author: HolyLow <[email protected]>
AuthorDate: Tue Dec 17 17:52:38 2024 +0800
[CELEBORN-1772][CIP-14] Add memory module to cppClient
### What changes were proposed in this pull request?
Add memory module to cppClient to provide ByteBuffer functionality.
### Why are the changes needed?
The memory module is added to provide ByteBuffer functionality, which would
be used across the data parsing layers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes #2996 from
HolyLow/issue/celeborn-1772-add-memory-module-to-cppClient.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/CMakeLists.txt | 1 +
cpp/celeborn/memory/ByteBuffer.cpp | 76 +++++++++
cpp/celeborn/memory/ByteBuffer.h | 180 ++++++++++++++++++++
cpp/celeborn/{ => memory}/CMakeLists.txt | 17 +-
cpp/celeborn/memory/tests/ByteBufferTest.cpp | 221 +++++++++++++++++++++++++
cpp/celeborn/{ => memory/tests}/CMakeLists.txt | 17 +-
6 files changed, 508 insertions(+), 4 deletions(-)
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/CMakeLists.txt
index 70e335101..e4ba4be75 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/CMakeLists.txt
@@ -14,3 +14,4 @@
# limitations under the License.
add_subdirectory(utils)
add_subdirectory(proto)
+add_subdirectory(memory)
diff --git a/cpp/celeborn/memory/ByteBuffer.cpp
b/cpp/celeborn/memory/ByteBuffer.cpp
new file mode 100644
index 000000000..0e049ea67
--- /dev/null
+++ b/cpp/celeborn/memory/ByteBuffer.cpp
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/memory/ByteBuffer.h"
+
+namespace celeborn {
+std::unique_ptr<WriteOnlyByteBuffer> ByteBuffer::createWriteOnly(
+ size_t initialCapacity,
+ bool isBigEndian) {
+ return std::make_unique<WriteOnlyByteBuffer>(initialCapacity, isBigEndian);
+}
+
+std::unique_ptr<ReadOnlyByteBuffer> ByteBuffer::createReadOnly(
+ std::unique_ptr<folly::IOBuf>&& data,
+ bool isBigEndian) {
+ return std::make_unique<ReadOnlyByteBuffer>(std::move(data), isBigEndian);
+}
+
+std::unique_ptr<ReadOnlyByteBuffer> ByteBuffer::toReadOnly(
+ std::unique_ptr<ByteBuffer>&& buffer) {
+ return std::make_unique<ReadOnlyByteBuffer>(
+ std::move(buffer->data_), buffer->isBigEndian_);
+}
+
+std::unique_ptr<ReadOnlyByteBuffer> ByteBuffer::concat(
+ const ReadOnlyByteBuffer& left,
+ const ReadOnlyByteBuffer& right) {
+ assert(left.isBigEndian_ == right.isBigEndian_);
+ bool isBigEndian = left.isBigEndian_;
+ if (left.remainingSize() == 0) {
+ return std::make_unique<ReadOnlyByteBuffer>(right);
+ }
+ if (right.remainingSize() == 0) {
+ return std::make_unique<ReadOnlyByteBuffer>(left);
+ }
+
+ auto leftData = trimBuffer(left);
+ auto rightData = trimBuffer(right);
+ assert(leftData);
+ assert(rightData);
+ leftData->appendToChain(std::move(rightData));
+ return createReadOnly(std::move(leftData), isBigEndian);
+}
+
+std::unique_ptr<folly::IOBuf> ByteBuffer::trimBuffer(
+ const ReadOnlyByteBuffer& buffer) {
+ auto data = buffer.data_->clone();
+ auto pos = buffer.cursor_->getCurrentPosition();
+ while (pos > 0 && data) {
+ if (pos >= data->length()) {
+ auto next = data->pop();
+ auto curr = std::move(data);
+ data = std::move(next);
+ pos -= curr->length();
+ } else {
+ data->trimStart(pos);
+ pos = 0;
+ }
+ }
+ return std::move(data);
+}
+} // namespace celeborn
\ No newline at end of file
diff --git a/cpp/celeborn/memory/ByteBuffer.h b/cpp/celeborn/memory/ByteBuffer.h
new file mode 100644
index 000000000..2c27ec9e8
--- /dev/null
+++ b/cpp/celeborn/memory/ByteBuffer.h
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/Cursor.h>
+#include <folly/io/IOBuf.h>
+
+namespace celeborn {
+class ReadOnlyByteBuffer;
+class WriteOnlyByteBuffer;
+
+class ByteBuffer {
+ public:
+ static std::unique_ptr<WriteOnlyByteBuffer> createWriteOnly(
+ size_t initialCapacity,
+ bool isBigEndian = true);
+
+ static std::unique_ptr<ReadOnlyByteBuffer> createReadOnly(
+ std::unique_ptr<folly::IOBuf>&& data,
+ bool isBigEndian = true);
+
+ static std::unique_ptr<ReadOnlyByteBuffer> toReadOnly(
+ std::unique_ptr<ByteBuffer>&& buffer);
+
+ static std::unique_ptr<ReadOnlyByteBuffer> concat(
+ const ReadOnlyByteBuffer& left,
+ const ReadOnlyByteBuffer& right);
+
+ protected:
+ ByteBuffer(std::unique_ptr<folly::IOBuf> data, bool isBigEndian)
+ : data_(std::move(data)), isBigEndian_(isBigEndian) {
+ assert(data_);
+ }
+
+ std::unique_ptr<folly::IOBuf> data_;
+ bool isBigEndian_;
+
+ private:
+ static std::unique_ptr<folly::IOBuf> trimBuffer(
+ const ReadOnlyByteBuffer& buffer);
+};
+
+class ReadOnlyByteBuffer : public ByteBuffer {
+ public:
+ friend class ByteBuffer;
+
+ static std::unique_ptr<ReadOnlyByteBuffer> createEmptyBuffer() {
+ return createReadOnly(folly::IOBuf::create(0));
+ }
+
+ ReadOnlyByteBuffer(const ReadOnlyByteBuffer& other)
+ : ByteBuffer(other.data_->clone(), other.isBigEndian_),
+ cursor_(std::make_unique<folly::io::Cursor>(data_.get())) {
+ cursor_->skip(other.cursor_->getCurrentPosition());
+ assert(other.remainingSize() == remainingSize());
+ }
+
+ ReadOnlyByteBuffer(std::unique_ptr<folly::IOBuf>&& data, bool isBigEndian)
+ : ByteBuffer(std::move(data), isBigEndian),
+ cursor_(std::make_unique<folly::io::Cursor>(data_.get())) {}
+
+ std::unique_ptr<ReadOnlyByteBuffer> clone() {
+ return std::make_unique<ReadOnlyByteBuffer>(*this);
+ }
+
+ template <typename T>
+ T read() {
+ if (isBigEndian_) {
+ return readBE<T>();
+ }
+ return readLE<T>();
+ }
+
+ template <typename T>
+ T readBE() {
+ return cursor_->readBE<T>();
+ }
+
+ template <typename T>
+ T readLE() {
+ return cursor_->readLE<T>();
+ }
+
+ void skip(size_t len) const {
+ cursor_->skip(len);
+ }
+
+ void retreat(size_t len) const {
+ cursor_->retreat(len);
+ }
+
+ size_t size() const {
+ return data_->computeChainDataLength();
+ }
+
+ // TODO: this interface is called rapidly. maybe need optimization.
+ size_t remainingSize() const {
+ return cursor_->totalLength();
+ }
+
+ std::string readToString() const {
+ return readToString(remainingSize());
+ }
+
+ std::string readToString(size_t len) const {
+ return cursor_->readFixedString(len);
+ }
+
+ size_t readToBuffer(void* buf, size_t len) const {
+ return cursor_->pullAtMost(buf, len);
+ }
+
+ std::unique_ptr<folly::IOBuf> getData() const {
+ return data_->clone();
+ }
+
+ private:
+ std::unique_ptr<folly::io::Cursor> cursor_;
+};
+
+class WriteOnlyByteBuffer : public ByteBuffer {
+ public:
+ friend class ByteBuffer;
+
+ // TODO: currently the appender is not allowed to grow.
+ WriteOnlyByteBuffer(size_t initialCapacity, bool isBigEndian)
+ : ByteBuffer(folly::IOBuf::createCombined(initialCapacity), isBigEndian),
+ appender_(std::make_unique<folly::io::Appender>(data_.get(), 0)) {}
+
+ WriteOnlyByteBuffer(std::unique_ptr<folly::IOBuf> data, bool isBigEndian)
+ : ByteBuffer(std::move(data), isBigEndian),
+ appender_(std::make_unique<folly::io::Appender>(data_.get(), 0)) {}
+
+ template <class T>
+ void write(T value) {
+ if (isBigEndian_) {
+ writeBE(value);
+ return;
+ }
+ writeLE(value);
+ }
+
+ template <class T>
+ void writeBE(T value) {
+ appender_->writeBE(value);
+ }
+
+ template <class T>
+ void writeLE(T value) {
+ appender_->writeLE(value);
+ }
+
+ void writeFromString(const std::string& data) const {
+ auto ptr = data.c_str();
+ appender_->push(reinterpret_cast<const uint8_t*>(ptr), data.size());
+ }
+
+ size_t size() const {
+ return data_->computeChainDataLength();
+ }
+
+ private:
+ std::unique_ptr<folly::io::Appender> appender_;
+};
+} // namespace celeborn
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/memory/CMakeLists.txt
similarity index 76%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/memory/CMakeLists.txt
index 70e335101..ff6a331c7 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/memory/CMakeLists.txt
@@ -12,5 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-add_subdirectory(utils)
-add_subdirectory(proto)
+add_library(
+ memory
+ ByteBuffer.cpp)
+
+
+target_link_libraries(
+ memory
+ ${FOLLY_WITH_DEPENDENCIES}
+ ${GLOG}
+ ${GFLAGS_LIBRARIES}
+)
+
+if(CELEBORN_BUILD_TESTS)
+ add_subdirectory(tests)
+endif()
\ No newline at end of file
diff --git a/cpp/celeborn/memory/tests/ByteBufferTest.cpp
b/cpp/celeborn/memory/tests/ByteBufferTest.cpp
new file mode 100644
index 000000000..26865add7
--- /dev/null
+++ b/cpp/celeborn/memory/tests/ByteBufferTest.cpp
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/memory/ByteBuffer.h"
+
+using namespace celeborn;
+
+namespace {
+template <typename T>
+uint8_t* writeBigEndian(T t, uint8_t* data) {
+ const size_t size = sizeof(T);
+ const T mask = static_cast<T>(0xFF) << ((size - 1) * 8);
+ for (int i = 0; i < size; i++) {
+ T maskT = t & mask;
+ uint8_t byte = static_cast<uint8_t>(maskT >> ((size - 1) * 8));
+ data[i] = byte;
+ // We only use leftShift here to avoid arithmetic shifting.
+ t <<= 8;
+ }
+ return data + size;
+}
+
+template <typename T>
+uint8_t* writeLittleEndian(T t, uint8_t* data) {
+ const size_t size = sizeof(T);
+ const T mask = static_cast<T>(0xFF);
+ for (int i = 0; i < size; i++) {
+ T maskT = t & mask;
+ uint8_t byte = static_cast<uint8_t>(maskT);
+ data[i] = byte;
+ t >>= 8;
+ }
+ return data + size;
+}
+
+const std::string strPayload = "this is a test";
+const int16_t int16Payload = 0xBEEF;
+const int32_t int32Payload = 0xBAADBEEF;
+const int64_t int64Payload = 0xBAADBEEFBAADBEEF;
+
+} // namespace
+
+const size_t testSize = strPayload.size() +
+ (sizeof(int16_t) + sizeof(int32_t) + sizeof(int64_t)) * 2;
+
+std::unique_ptr<uint8_t> createRawData(size_t& size) {
+ size = testSize;
+ auto data = std::unique_ptr<uint8_t>(new uint8_t[size]);
+
+ uint8_t* curr = data.get();
+ memcpy(curr, strPayload.c_str(), strPayload.size());
+ curr += strPayload.size();
+ curr = writeBigEndian(int16Payload, curr);
+ curr = writeBigEndian(int32Payload, curr);
+ curr = writeBigEndian(int64Payload, curr);
+ curr = writeLittleEndian(int16Payload, curr);
+ curr = writeLittleEndian(int32Payload, curr);
+ curr = writeLittleEndian(int64Payload, curr);
+ EXPECT_EQ(curr, data.get() + size);
+ return std::move(data);
+}
+
+std::unique_ptr<WriteOnlyByteBuffer> createWriteOnlyBuffer(size_t& size) {
+ size = testSize;
+ auto writeBuffer = ByteBuffer::createWriteOnly(size);
+ EXPECT_EQ(writeBuffer->size(), 0);
+ writeBuffer->writeFromString(strPayload);
+ EXPECT_EQ(writeBuffer->size(), strPayload.size());
+ writeBuffer->writeBE(int16Payload);
+ writeBuffer->writeBE(int32Payload);
+ writeBuffer->writeBE(int64Payload);
+ writeBuffer->writeLE(int16Payload);
+ writeBuffer->writeLE(int32Payload);
+ writeBuffer->writeLE(int64Payload);
+ EXPECT_EQ(writeBuffer->size(), size);
+ return std::move(writeBuffer);
+}
+
+void testReadData(ReadOnlyByteBuffer* readBuffer, size_t size) {
+ EXPECT_EQ(size, testSize);
+ size_t remainingSize = size;
+ EXPECT_EQ(readBuffer->size(), size);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read string.
+ auto strRead = readBuffer->readToString(strPayload.size());
+ EXPECT_EQ(strRead, strPayload);
+ remainingSize -= strPayload.size();
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read BigEndian.
+ EXPECT_EQ(readBuffer->readBE<int16_t>(), int16Payload);
+ remainingSize -= sizeof(int16_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(readBuffer->readBE<int32_t>(), int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(readBuffer->readBE<int64_t>(), int64Payload);
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read LittleEndian.
+ EXPECT_EQ(readBuffer->readLE<int16_t>(), int16Payload);
+ remainingSize -= sizeof(int16_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(readBuffer->readLE<int32_t>(), int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(readBuffer->readLE<int64_t>(), int64Payload);
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test retreat and skip.
+ const auto retreatSize = sizeof(int32_t) + sizeof(int64_t);
+ remainingSize += retreatSize;
+ readBuffer->retreat(retreatSize);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(readBuffer->readLE<int32_t>(), int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+ readBuffer->skip(sizeof(int64_t));
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(readBuffer->remainingSize(), remainingSize);
+
+ // Test read end.
+ EXPECT_EQ(readBuffer->size(), size);
+ EXPECT_EQ(readBuffer->remainingSize(), 0);
+ EXPECT_THROW(readBuffer->readLE<int64_t>(), std::exception);
+}
+
+TEST(ByteBufferTest, continuousBufferRead) {
+ size_t size = 0;
+ auto data = createRawData(size);
+ auto ioBuf = folly::IOBuf::wrapBuffer(data.get(), size);
+
+ auto readBuffer = ByteBuffer::createReadOnly(std::move(ioBuf));
+ testReadData(readBuffer.get(), size);
+}
+
+TEST(ByteBufferTest, segmentedBufferRead) {
+ size_t size = 0;
+ auto data = createRawData(size);
+
+ std::vector<size_t> sizes(4);
+ std::vector<std::unique_ptr<uint8_t>> segments(4);
+ sizes[0] = 7;
+ sizes[1] = 11;
+ sizes[2] = 17;
+ sizes[3] = size - sizes[0] - sizes[1] - sizes[2];
+ EXPECT_GT(sizes[3], 0);
+ std::unique_ptr<folly::IOBuf> ioBuf;
+ // Create segmented IOBuf source data.
+ for (int i = 0, accSize = 0; i < sizes.size(); i++) {
+ segments[i] = std::unique_ptr<uint8_t>(new uint8_t[sizes[i]]);
+ auto segmentBuf = folly::IOBuf::wrapBuffer(segments[i].get(), sizes[i]);
+ memcpy(segments[i].get(), data.get() + accSize, sizes[i]);
+ if (ioBuf) {
+ ioBuf->appendToChain(std::move(segmentBuf));
+ } else {
+ ioBuf = std::move(segmentBuf);
+ }
+ accSize += sizes[i];
+ }
+
+ auto readBuffer = ByteBuffer::createReadOnly(std::move(ioBuf));
+ testReadData(readBuffer.get(), size);
+}
+
+TEST(ByteBufferTest, writeBufferAndRead) {
+ size_t size = 0;
+ auto writeBuffer = createWriteOnlyBuffer(size);
+ auto readBuffer = WriteOnlyByteBuffer::toReadOnly(std::move(writeBuffer));
+ testReadData(readBuffer.get(), size);
+}
+
+TEST(ByteBufferTest, concatReadBuffer) {
+ size_t size = 0;
+ auto writeBuffer1 = createWriteOnlyBuffer(size);
+ auto readBuffer1 = WriteOnlyByteBuffer::toReadOnly(std::move(writeBuffer1));
+ testReadData(readBuffer1.get(), size);
+ auto writeBuffer2 = createWriteOnlyBuffer(size);
+ auto readBuffer2 = WriteOnlyByteBuffer::toReadOnly(std::move(writeBuffer2));
+ testReadData(readBuffer2.get(), size);
+ auto retreatSize1 = sizeof(int32_t) + sizeof(int64_t);
+ auto retreatSize2 = sizeof(int64_t);
+ readBuffer1->retreat(retreatSize1);
+ readBuffer2->retreat(retreatSize2);
+ auto concatedReadBuffer = ByteBuffer::concat(*readBuffer1, *readBuffer2);
+ auto remainingSize = retreatSize1 + retreatSize2;
+ EXPECT_EQ(concatedReadBuffer->remainingSize(), remainingSize);
+
+ // Read content of original readBuffer1.
+ EXPECT_EQ(concatedReadBuffer->readLE<int32_t>(), int32Payload);
+ remainingSize -= sizeof(int32_t);
+ EXPECT_EQ(concatedReadBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(concatedReadBuffer->readLE<int64_t>(), int64Payload);
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(concatedReadBuffer->remainingSize(), remainingSize);
+
+ // Read content of original readBuffer2.
+ EXPECT_EQ(concatedReadBuffer->readLE<int64_t>(), int64Payload);
+ remainingSize -= sizeof(int64_t);
+ EXPECT_EQ(concatedReadBuffer->remainingSize(), remainingSize);
+ EXPECT_EQ(concatedReadBuffer->remainingSize(), 0);
+}
\ No newline at end of file
diff --git a/cpp/celeborn/CMakeLists.txt
b/cpp/celeborn/memory/tests/CMakeLists.txt
similarity index 68%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/memory/tests/CMakeLists.txt
index 70e335101..5d87cec47 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/memory/tests/CMakeLists.txt
@@ -12,5 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-add_subdirectory(utils)
-add_subdirectory(proto)
+
+add_executable(celeborn_memory_test ByteBufferTest.cpp)
+
+add_test(NAME celeborn_memory_test COMMAND celeborn_memory_test)
+
+target_link_libraries(
+ celeborn_memory_test
+ PRIVATE
+ memory
+ ${FOLLY_WITH_DEPENDENCIES}
+ ${GLOG}
+ ${GFLAGS_LIBRARIES}
+ GTest::gtest
+ GTest::gmock
+ GTest::gtest_main)
\ No newline at end of file