This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new eb950c82e [CELEBORN-1827][CIP-14] Add messageDecoder to cppClient
eb950c82e is described below
commit eb950c82e5d7840c131a5a886db114c606a821c0
Author: HolyLow <[email protected]>
AuthorDate: Fri Jan 10 16:42:31 2025 +0800
[CELEBORN-1827][CIP-14] Add messageDecoder to cppClient
### What changes were proposed in this pull request?
This PR adds MessageDecoder to cppClient.
### Why are the changes needed?
MessageDecoder is the underlaying decoding stack for wangle network msg
processing framework.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes #3060 from
HolyLow/issue/celeborn-1827-add-message-decoder-to-cpp-client.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/CMakeLists.txt | 1 +
cpp/celeborn/{ => network}/CMakeLists.txt | 9 +-
cpp/celeborn/network/FrameDecoder.h | 84 ++++++++++++++++
cpp/celeborn/{ => network/tests}/CMakeLists.txt | 28 +++++-
cpp/celeborn/network/tests/FrameDecoderTest.cpp | 123 ++++++++++++++++++++++++
5 files changed, 235 insertions(+), 10 deletions(-)
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/CMakeLists.txt
index d22bca3f1..c5fe93782 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/CMakeLists.txt
@@ -17,3 +17,4 @@ add_subdirectory(memory)
add_subdirectory(utils)
add_subdirectory(conf)
add_subdirectory(protocol)
+add_subdirectory(network)
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/network/CMakeLists.txt
similarity index 86%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/network/CMakeLists.txt
index d22bca3f1..889d959f5 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/network/CMakeLists.txt
@@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-add_subdirectory(proto)
-add_subdirectory(memory)
-add_subdirectory(utils)
-add_subdirectory(conf)
-add_subdirectory(protocol)
+
+if(CELEBORN_BUILD_TESTS)
+ add_subdirectory(tests)
+endif()
diff --git a/cpp/celeborn/network/FrameDecoder.h
b/cpp/celeborn/network/FrameDecoder.h
new file mode 100644
index 000000000..e6e371619
--- /dev/null
+++ b/cpp/celeborn/network/FrameDecoder.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/Cursor.h>
+#include <wangle/codec/ByteToMessageDecoder.h>
+
+namespace celeborn {
+namespace network {
+/**
+ * A complete Message encoding/decoding frame is:
+ * -----------------------------------------------------------------------
+ * | encodedLength | msgType | bodyLength | encodedContent | bodyContent |
+ * -----------------------------------------------------------------------
+ * The size of each part is:
+ * -----------------------------------------------------------------------
+ * | 4 | 1 | 4 | #encodedLength | #bodyLength |
+ * -----------------------------------------------------------------------
+ * So the #headerLength is 4 + 1 + 4,
+ * and the complete frameLength is:
+ * #frameLength = #headerLength + #encodedLength + #bodyLength.
+ */
+
+class FrameDecoder : public wangle::ByteToByteDecoder {
+ public:
+ FrameDecoder(bool networkByteOrder = true)
+ : networkByteOrder_(networkByteOrder) {}
+
+ bool decode(
+ Context* ctx,
+ folly::IOBufQueue& buf,
+ std::unique_ptr<folly::IOBuf>& result,
+ size_t&) override {
+ if (buf.chainLength() < headerLength_) {
+ return false;
+ }
+
+ folly::io::Cursor c(buf.front());
+ int encodedLength, bodyLength;
+ if (networkByteOrder_) {
+ encodedLength = c.readBE<int32_t>();
+ c.skip(1);
+ bodyLength = c.readBE<int32_t>();
+ } else {
+ encodedLength = c.readLE<int32_t>();
+ c.skip(1);
+ bodyLength = c.readLE<int32_t>();
+ }
+
+ uint64_t frameLength = headerLength_ + encodedLength + bodyLength;
+ if (buf.chainLength() < frameLength) {
+ return false;
+ }
+
+ result = buf.split(frameLength);
+ return true;
+ }
+
+ private:
+ bool networkByteOrder_;
+
+ static constexpr int lenEncodedLength_ = 4;
+ static constexpr int lenMsgType_ = 1;
+ static constexpr int lenBodyLength_ = 4;
+ static constexpr int headerLength_ =
+ lenEncodedLength_ + lenMsgType_ + lenBodyLength_;
+};
+} // namespace network
+} // namespace celeborn
diff --git a/cpp/celeborn/CMakeLists.txt
b/cpp/celeborn/network/tests/CMakeLists.txt
similarity index 61%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/network/tests/CMakeLists.txt
index d22bca3f1..46a3fcf56 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/network/tests/CMakeLists.txt
@@ -12,8 +12,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-add_subdirectory(proto)
-add_subdirectory(memory)
-add_subdirectory(utils)
-add_subdirectory(conf)
-add_subdirectory(protocol)
+
+add_executable(
+ celeborn_network_test
+ FrameDecoderTest.cpp)
+
+add_test(NAME celeborn_network_test COMMAND celeborn_network_test)
+
+target_link_libraries(
+ celeborn_network_test
+ PRIVATE
+ memory
+ proto
+ protocol
+ utils
+ ${WANGLE}
+ ${FIZZ}
+ ${LIBSODIUM_LIBRARY}
+ ${FOLLY_WITH_DEPENDENCIES}
+ ${GLOG}
+ ${GFLAGS_LIBRARIES}
+ GTest::gtest
+ GTest::gmock
+ GTest::gtest_main)
\ No newline at end of file
diff --git a/cpp/celeborn/network/tests/FrameDecoderTest.cpp
b/cpp/celeborn/network/tests/FrameDecoderTest.cpp
new file mode 100644
index 000000000..63c776f33
--- /dev/null
+++ b/cpp/celeborn/network/tests/FrameDecoderTest.cpp
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/network/FrameDecoder.h"
+
+using namespace celeborn::network;
+
+namespace {
+template <typename T>
+uint8_t* writeBigEndian(T t, uint8_t* data) {
+ const size_t size = sizeof(T);
+ const T mask = static_cast<T>(0xFF) << ((size - 1) * 8);
+ for (int i = 0; i < size; i++) {
+ T maskT = t & mask;
+ uint8_t byte = static_cast<uint8_t>(maskT >> ((size - 1) * 8));
+ data[i] = byte;
+ // We only use leftShift here to avoid arithmetic shifting.
+ t <<= 8;
+ }
+ return data + size;
+}
+
+template <typename T>
+uint8_t* writeLittleEndian(T t, uint8_t* data) {
+ const size_t size = sizeof(T);
+ const T mask = static_cast<T>(0xFF);
+ for (int i = 0; i < size; i++) {
+ T maskT = t & mask;
+ uint8_t byte = static_cast<uint8_t>(maskT);
+ data[i] = byte;
+ t >>= 8;
+ }
+ return data + size;
+}
+} // namespace
+
+void testDecodeWithEndian(bool isBigEndian) {
+ const std::string encodedContent = "test-encodedContent";
+ const std::string bodyContent = "test-bodyContent";
+ const int encodedLength = encodedContent.size();
+ const uint8_t msgType = 10;
+ const int bodyLength = bodyContent.size();
+ const int frameLength =
+ sizeof(int) + sizeof(uint8_t) + sizeof(int) + encodedLength + bodyLength;
+ int endianEncodedLength = 0;
+ int endianBodyLength = 0;
+ if (isBigEndian) {
+ writeBigEndian<int>(
+ encodedLength, reinterpret_cast<uint8_t*>(&endianEncodedLength));
+ writeBigEndian<int>(
+ bodyLength, reinterpret_cast<uint8_t*>(&endianBodyLength));
+ } else {
+ writeLittleEndian<int>(
+ encodedLength, reinterpret_cast<uint8_t*>(&endianEncodedLength));
+ writeLittleEndian<int>(
+ bodyLength, reinterpret_cast<uint8_t*>(&endianBodyLength));
+ }
+
+ size_t dummy;
+ FrameDecoder decoder(isBigEndian);
+ folly::IOBufQueue::Options option;
+ option.cacheChainLength = true;
+ folly::IOBufQueue queue(option);
+ std::unique_ptr<folly::IOBuf> result;
+
+ queue.append(folly::IOBuf::wrapBuffer(&endianEncodedLength, sizeof(int)));
+ EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+ queue.append(folly::IOBuf::wrapBuffer(&msgType, sizeof(uint8_t)));
+ EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+ queue.append(folly::IOBuf::wrapBuffer(&endianBodyLength, sizeof(int)));
+ EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+ queue.append(
+ folly::IOBuf::wrapBuffer(encodedContent.c_str(), encodedContent.size()));
+ EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+ queue.append(
+ folly::IOBuf::wrapBuffer(bodyContent.c_str(), bodyContent.size()));
+ EXPECT_TRUE(decoder.decode(nullptr, queue, result, dummy));
+ EXPECT_EQ(queue.chainLength(), 0);
+
+ auto cursor = std::make_unique<folly::io::Cursor>(result.get());
+ EXPECT_EQ(cursor->totalLength(), frameLength);
+ if (isBigEndian) {
+ EXPECT_EQ(cursor->readBE<int>(), encodedLength);
+ } else {
+ EXPECT_EQ(cursor->readLE<int>(), encodedLength);
+ }
+ EXPECT_EQ(cursor->read<uint8_t>(), msgType);
+ if (isBigEndian) {
+ EXPECT_EQ(cursor->readBE<int>(), bodyLength);
+ } else {
+ EXPECT_EQ(cursor->readLE<int>(), bodyLength);
+ }
+ EXPECT_EQ(cursor->readFixedString(encodedLength), encodedContent);
+ EXPECT_EQ(cursor->readFixedString(bodyLength), bodyContent);
+}
+
+TEST(FrameDecoderTest, decodeBigEndian) {
+ testDecodeWithEndian(true);
+}
+
+TEST(FrameDecoderTest, decodeLittleEndian) {
+ testDecodeWithEndian(false);
+}