This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b096ea87 [CELEBORN-1814][CIP-14] Add transportMessage to cppClient
8b096ea87 is described below

commit 8b096ea8798baaad016cd6dbb7ec59caab7240de
Author: HolyLow <[email protected]>
AuthorDate: Fri Jan 3 16:18:50 2025 +0800

    [CELEBORN-1814][CIP-14] Add transportMessage to cppClient
    
    ### What changes were proposed in this pull request?
    Add transportMessage to cppClient.
    
    ### Why are the changes needed?
    TransportMessage is the building block of controlMessages.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation and UTs.
    
    Closes #3042 from 
HolyLow/issue/celeborn-1814-add-transport-message-to-cppClient.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 cpp/celeborn/protocol/CMakeLists.txt               |  6 ++-
 cpp/celeborn/protocol/TransportMessage.cpp         | 49 ++++++++++++++++++
 cpp/celeborn/protocol/TransportMessage.h           | 50 ++++++++++++++++++
 cpp/celeborn/protocol/tests/CMakeLists.txt         |  5 +-
 .../protocol/tests/TransportMessageTest.cpp        | 59 ++++++++++++++++++++++
 5 files changed, 167 insertions(+), 2 deletions(-)

diff --git a/cpp/celeborn/protocol/CMakeLists.txt 
b/cpp/celeborn/protocol/CMakeLists.txt
index e92778e93..6a6ef88a7 100644
--- a/cpp/celeborn/protocol/CMakeLists.txt
+++ b/cpp/celeborn/protocol/CMakeLists.txt
@@ -12,7 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-add_library(protocol PartitionLocation.cpp)
+add_library(
+        protocol
+        STATIC
+        PartitionLocation.cpp
+        TransportMessage.cpp)
 
 target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})
 
diff --git a/cpp/celeborn/protocol/TransportMessage.cpp 
b/cpp/celeborn/protocol/TransportMessage.cpp
new file mode 100644
index 000000000..351dac5b0
--- /dev/null
+++ b/cpp/celeborn/protocol/TransportMessage.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/protocol/TransportMessage.h"
+#include "celeborn/utils/Exceptions.h"
+
+namespace celeborn {
+namespace protocol {
+TransportMessage::TransportMessage(MessageType type, std::string&& payload)
+    : type_(type), payload_(std::move(payload)) {
+  messageTypeValue_ = type;
+}
+
+TransportMessage::TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf) {
+  int messageTypeValue = buf->read<int32_t>();
+  int payloadLen = buf->read<int32_t>();
+  CELEBORN_CHECK_EQ(buf->remainingSize(), payloadLen);
+  CELEBORN_CHECK(MessageType_IsValid(messageTypeValue));
+  type_ = static_cast<MessageType>(messageTypeValue);
+  messageTypeValue_ = type_;
+  payload_ = buf->readToString(payloadLen);
+}
+
+std::unique_ptr<ReadOnlyByteBuffer> TransportMessage::toReadOnlyByteBuffer()
+    const {
+  int bufSize = payload_.size() + 4 + 4;
+  auto buffer = ByteBuffer::createWriteOnly(bufSize);
+  buffer->write<int>(messageTypeValue_);
+  buffer->write<int>(payload_.size());
+  buffer->writeFromString(payload_);
+  CELEBORN_CHECK_EQ(buffer->size(), bufSize);
+  return ByteBuffer::toReadOnly(std::move(buffer));
+}
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/TransportMessage.h 
b/cpp/celeborn/protocol/TransportMessage.h
new file mode 100644
index 000000000..22a162eea
--- /dev/null
+++ b/cpp/celeborn/protocol/TransportMessage.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/Cursor.h>
+#include <string>
+
+#include "celeborn/memory/ByteBuffer.h"
+#include "celeborn/proto/TransportMessagesCpp.pb.h"
+
+namespace celeborn {
+namespace protocol {
+class TransportMessage {
+ public:
+  TransportMessage(MessageType type, std::string&& payload);
+
+  TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf);
+
+  std::unique_ptr<ReadOnlyByteBuffer> toReadOnlyByteBuffer() const;
+
+  MessageType type() const {
+    return type_;
+  }
+
+  std::string payload() const {
+    return payload_;
+  }
+
+ private:
+  MessageType type_;
+  int messageTypeValue_;
+  std::string payload_;
+};
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/tests/CMakeLists.txt 
b/cpp/celeborn/protocol/tests/CMakeLists.txt
index 8f7b5e2f2..f5a00db2a 100644
--- a/cpp/celeborn/protocol/tests/CMakeLists.txt
+++ b/cpp/celeborn/protocol/tests/CMakeLists.txt
@@ -13,7 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-add_executable(celeborn_protocol_test PartitionLocationTest.cpp)
+add_executable(
+        celeborn_protocol_test
+        PartitionLocationTest.cpp
+        TransportMessageTest.cpp)
 
 add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test)
 
diff --git a/cpp/celeborn/protocol/tests/TransportMessageTest.cpp 
b/cpp/celeborn/protocol/tests/TransportMessageTest.cpp
new file mode 100644
index 000000000..c0f64c686
--- /dev/null
+++ b/cpp/celeborn/protocol/tests/TransportMessageTest.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/proto/TransportMessagesCpp.pb.h"
+#include "celeborn/protocol/TransportMessage.h"
+
+using namespace celeborn;
+using namespace celeborn::protocol;
+
+TEST(TransportMessageTest, constructFromPayload) {
+  const std::string payload = "payload";
+  auto payloadToMove = payload;
+  auto messageType = 10;
+  auto transportMessage = std::make_unique<TransportMessage>(
+      static_cast<MessageType>(messageType), std::move(payloadToMove));
+
+  auto transportMessageBuffer = transportMessage->toReadOnlyByteBuffer();
+  EXPECT_EQ(
+      transportMessageBuffer->remainingSize(),
+      sizeof(int) * 2 + payload.size());
+  EXPECT_EQ(transportMessageBuffer->read<int>(), messageType);
+  EXPECT_EQ(transportMessageBuffer->read<int>(), payload.size());
+  EXPECT_EQ(transportMessageBuffer->readToString(), payload);
+}
+
+TEST(TransportMessageTest, constructFromReadOnlyBuffer) {
+  const std::string payload = "payload";
+  auto payloadToMove = payload;
+  auto messageType = 10;
+  auto contentMessage = std::make_unique<TransportMessage>(
+      static_cast<MessageType>(messageType), std::move(payloadToMove));
+  auto contentBuffer = contentMessage->toReadOnlyByteBuffer();
+
+  auto transportMessage =
+      std::make_unique<TransportMessage>(std::move(contentBuffer));
+  auto transportMessageBuffer = transportMessage->toReadOnlyByteBuffer();
+  EXPECT_EQ(
+      transportMessageBuffer->remainingSize(),
+      sizeof(int) * 2 + payload.size());
+  EXPECT_EQ(transportMessageBuffer->read<int>(), messageType);
+  EXPECT_EQ(transportMessageBuffer->read<int>(), payload.size());
+  EXPECT_EQ(transportMessageBuffer->readToString(), payload);
+}

Reply via email to