This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new eb950c82e [CELEBORN-1827][CIP-14] Add messageDecoder to cppClient
eb950c82e is described below

commit eb950c82e5d7840c131a5a886db114c606a821c0
Author: HolyLow <[email protected]>
AuthorDate: Fri Jan 10 16:42:31 2025 +0800

    [CELEBORN-1827][CIP-14] Add messageDecoder to cppClient
    
    ### What changes were proposed in this pull request?
    This PR adds MessageDecoder to cppClient.
    
    ### Why are the changes needed?
    MessageDecoder is the underlaying decoding stack for wangle network msg 
processing framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation and UTs.
    
    Closes #3060 from 
HolyLow/issue/celeborn-1827-add-message-decoder-to-cpp-client.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 cpp/celeborn/CMakeLists.txt                     |   1 +
 cpp/celeborn/{ => network}/CMakeLists.txt       |   9 +-
 cpp/celeborn/network/FrameDecoder.h             |  84 ++++++++++++++++
 cpp/celeborn/{ => network/tests}/CMakeLists.txt |  28 +++++-
 cpp/celeborn/network/tests/FrameDecoderTest.cpp | 123 ++++++++++++++++++++++++
 5 files changed, 235 insertions(+), 10 deletions(-)

diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/CMakeLists.txt
index d22bca3f1..c5fe93782 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/CMakeLists.txt
@@ -17,3 +17,4 @@ add_subdirectory(memory)
 add_subdirectory(utils)
 add_subdirectory(conf)
 add_subdirectory(protocol)
+add_subdirectory(network)
diff --git a/cpp/celeborn/CMakeLists.txt b/cpp/celeborn/network/CMakeLists.txt
similarity index 86%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/network/CMakeLists.txt
index d22bca3f1..889d959f5 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/network/CMakeLists.txt
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-add_subdirectory(proto)
-add_subdirectory(memory)
-add_subdirectory(utils)
-add_subdirectory(conf)
-add_subdirectory(protocol)
+
+if(CELEBORN_BUILD_TESTS)
+    add_subdirectory(tests)
+endif()
diff --git a/cpp/celeborn/network/FrameDecoder.h 
b/cpp/celeborn/network/FrameDecoder.h
new file mode 100644
index 000000000..e6e371619
--- /dev/null
+++ b/cpp/celeborn/network/FrameDecoder.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/Cursor.h>
+#include <wangle/codec/ByteToMessageDecoder.h>
+
+namespace celeborn {
+namespace network {
+/**
+ * A complete Message encoding/decoding frame is:
+ * -----------------------------------------------------------------------
+ * | encodedLength | msgType | bodyLength | encodedContent | bodyContent |
+ * -----------------------------------------------------------------------
+ * The size of each part is:
+ * -----------------------------------------------------------------------
+ * | 4             | 1       | 4          | #encodedLength | #bodyLength |
+ * -----------------------------------------------------------------------
+ * So the #headerLength is 4 + 1 + 4,
+ * and the complete frameLength is:
+ *   #frameLength = #headerLength + #encodedLength + #bodyLength.
+ */
+
+class FrameDecoder : public wangle::ByteToByteDecoder {
+ public:
+  FrameDecoder(bool networkByteOrder = true)
+      : networkByteOrder_(networkByteOrder) {}
+
+  bool decode(
+      Context* ctx,
+      folly::IOBufQueue& buf,
+      std::unique_ptr<folly::IOBuf>& result,
+      size_t&) override {
+    if (buf.chainLength() < headerLength_) {
+      return false;
+    }
+
+    folly::io::Cursor c(buf.front());
+    int encodedLength, bodyLength;
+    if (networkByteOrder_) {
+      encodedLength = c.readBE<int32_t>();
+      c.skip(1);
+      bodyLength = c.readBE<int32_t>();
+    } else {
+      encodedLength = c.readLE<int32_t>();
+      c.skip(1);
+      bodyLength = c.readLE<int32_t>();
+    }
+
+    uint64_t frameLength = headerLength_ + encodedLength + bodyLength;
+    if (buf.chainLength() < frameLength) {
+      return false;
+    }
+
+    result = buf.split(frameLength);
+    return true;
+  }
+
+ private:
+  bool networkByteOrder_;
+
+  static constexpr int lenEncodedLength_ = 4;
+  static constexpr int lenMsgType_ = 1;
+  static constexpr int lenBodyLength_ = 4;
+  static constexpr int headerLength_ =
+      lenEncodedLength_ + lenMsgType_ + lenBodyLength_;
+};
+} // namespace network
+} // namespace celeborn
diff --git a/cpp/celeborn/CMakeLists.txt 
b/cpp/celeborn/network/tests/CMakeLists.txt
similarity index 61%
copy from cpp/celeborn/CMakeLists.txt
copy to cpp/celeborn/network/tests/CMakeLists.txt
index d22bca3f1..46a3fcf56 100644
--- a/cpp/celeborn/CMakeLists.txt
+++ b/cpp/celeborn/network/tests/CMakeLists.txt
@@ -12,8 +12,26 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-add_subdirectory(proto)
-add_subdirectory(memory)
-add_subdirectory(utils)
-add_subdirectory(conf)
-add_subdirectory(protocol)
+
+add_executable(
+        celeborn_network_test
+        FrameDecoderTest.cpp)
+
+add_test(NAME celeborn_network_test COMMAND celeborn_network_test)
+
+target_link_libraries(
+        celeborn_network_test
+        PRIVATE
+        memory
+        proto
+        protocol
+        utils
+        ${WANGLE}
+        ${FIZZ}
+        ${LIBSODIUM_LIBRARY}
+        ${FOLLY_WITH_DEPENDENCIES}
+        ${GLOG}
+        ${GFLAGS_LIBRARIES}
+        GTest::gtest
+        GTest::gmock
+        GTest::gtest_main)
\ No newline at end of file
diff --git a/cpp/celeborn/network/tests/FrameDecoderTest.cpp 
b/cpp/celeborn/network/tests/FrameDecoderTest.cpp
new file mode 100644
index 000000000..63c776f33
--- /dev/null
+++ b/cpp/celeborn/network/tests/FrameDecoderTest.cpp
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/network/FrameDecoder.h"
+
+using namespace celeborn::network;
+
+namespace {
+template <typename T>
+uint8_t* writeBigEndian(T t, uint8_t* data) {
+  const size_t size = sizeof(T);
+  const T mask = static_cast<T>(0xFF) << ((size - 1) * 8);
+  for (int i = 0; i < size; i++) {
+    T maskT = t & mask;
+    uint8_t byte = static_cast<uint8_t>(maskT >> ((size - 1) * 8));
+    data[i] = byte;
+    // We only use leftShift here to avoid arithmetic shifting.
+    t <<= 8;
+  }
+  return data + size;
+}
+
+template <typename T>
+uint8_t* writeLittleEndian(T t, uint8_t* data) {
+  const size_t size = sizeof(T);
+  const T mask = static_cast<T>(0xFF);
+  for (int i = 0; i < size; i++) {
+    T maskT = t & mask;
+    uint8_t byte = static_cast<uint8_t>(maskT);
+    data[i] = byte;
+    t >>= 8;
+  }
+  return data + size;
+}
+} // namespace
+
+void testDecodeWithEndian(bool isBigEndian) {
+  const std::string encodedContent = "test-encodedContent";
+  const std::string bodyContent = "test-bodyContent";
+  const int encodedLength = encodedContent.size();
+  const uint8_t msgType = 10;
+  const int bodyLength = bodyContent.size();
+  const int frameLength =
+      sizeof(int) + sizeof(uint8_t) + sizeof(int) + encodedLength + bodyLength;
+  int endianEncodedLength = 0;
+  int endianBodyLength = 0;
+  if (isBigEndian) {
+    writeBigEndian<int>(
+        encodedLength, reinterpret_cast<uint8_t*>(&endianEncodedLength));
+    writeBigEndian<int>(
+        bodyLength, reinterpret_cast<uint8_t*>(&endianBodyLength));
+  } else {
+    writeLittleEndian<int>(
+        encodedLength, reinterpret_cast<uint8_t*>(&endianEncodedLength));
+    writeLittleEndian<int>(
+        bodyLength, reinterpret_cast<uint8_t*>(&endianBodyLength));
+  }
+
+  size_t dummy;
+  FrameDecoder decoder(isBigEndian);
+  folly::IOBufQueue::Options option;
+  option.cacheChainLength = true;
+  folly::IOBufQueue queue(option);
+  std::unique_ptr<folly::IOBuf> result;
+
+  queue.append(folly::IOBuf::wrapBuffer(&endianEncodedLength, sizeof(int)));
+  EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+  queue.append(folly::IOBuf::wrapBuffer(&msgType, sizeof(uint8_t)));
+  EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+  queue.append(folly::IOBuf::wrapBuffer(&endianBodyLength, sizeof(int)));
+  EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+  queue.append(
+      folly::IOBuf::wrapBuffer(encodedContent.c_str(), encodedContent.size()));
+  EXPECT_FALSE(decoder.decode(nullptr, queue, result, dummy));
+
+  queue.append(
+      folly::IOBuf::wrapBuffer(bodyContent.c_str(), bodyContent.size()));
+  EXPECT_TRUE(decoder.decode(nullptr, queue, result, dummy));
+  EXPECT_EQ(queue.chainLength(), 0);
+
+  auto cursor = std::make_unique<folly::io::Cursor>(result.get());
+  EXPECT_EQ(cursor->totalLength(), frameLength);
+  if (isBigEndian) {
+    EXPECT_EQ(cursor->readBE<int>(), encodedLength);
+  } else {
+    EXPECT_EQ(cursor->readLE<int>(), encodedLength);
+  }
+  EXPECT_EQ(cursor->read<uint8_t>(), msgType);
+  if (isBigEndian) {
+    EXPECT_EQ(cursor->readBE<int>(), bodyLength);
+  } else {
+    EXPECT_EQ(cursor->readLE<int>(), bodyLength);
+  }
+  EXPECT_EQ(cursor->readFixedString(encodedLength), encodedContent);
+  EXPECT_EQ(cursor->readFixedString(bodyLength), bodyContent);
+}
+
+TEST(FrameDecoderTest, decodeBigEndian) {
+  testDecodeWithEndian(true);
+}
+
+TEST(FrameDecoderTest, decodeLittleEndian) {
+  testDecodeWithEndian(false);
+}

Reply via email to