This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8b096ea87 [CELEBORN-1814][CIP-14] Add transportMessage to cppClient
8b096ea87 is described below
commit 8b096ea8798baaad016cd6dbb7ec59caab7240de
Author: HolyLow <[email protected]>
AuthorDate: Fri Jan 3 16:18:50 2025 +0800
[CELEBORN-1814][CIP-14] Add transportMessage to cppClient
### What changes were proposed in this pull request?
Add transportMessage to cppClient.
### Why are the changes needed?
TransportMessage is the building block of controlMessages.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes #3042 from
HolyLow/issue/celeborn-1814-add-transport-message-to-cppClient.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/protocol/CMakeLists.txt | 6 ++-
cpp/celeborn/protocol/TransportMessage.cpp | 49 ++++++++++++++++++
cpp/celeborn/protocol/TransportMessage.h | 50 ++++++++++++++++++
cpp/celeborn/protocol/tests/CMakeLists.txt | 5 +-
.../protocol/tests/TransportMessageTest.cpp | 59 ++++++++++++++++++++++
5 files changed, 167 insertions(+), 2 deletions(-)
diff --git a/cpp/celeborn/protocol/CMakeLists.txt
b/cpp/celeborn/protocol/CMakeLists.txt
index e92778e93..6a6ef88a7 100644
--- a/cpp/celeborn/protocol/CMakeLists.txt
+++ b/cpp/celeborn/protocol/CMakeLists.txt
@@ -12,7 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-add_library(protocol PartitionLocation.cpp)
+add_library(
+ protocol
+ STATIC
+ PartitionLocation.cpp
+ TransportMessage.cpp)
target_include_directories(protocol PUBLIC ${CMAKE_BINARY_DIR})
diff --git a/cpp/celeborn/protocol/TransportMessage.cpp
b/cpp/celeborn/protocol/TransportMessage.cpp
new file mode 100644
index 000000000..351dac5b0
--- /dev/null
+++ b/cpp/celeborn/protocol/TransportMessage.cpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/protocol/TransportMessage.h"
+#include "celeborn/utils/Exceptions.h"
+
+namespace celeborn {
+namespace protocol {
+TransportMessage::TransportMessage(MessageType type, std::string&& payload)
+ : type_(type), payload_(std::move(payload)) {
+ messageTypeValue_ = type;
+}
+
+TransportMessage::TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf) {
+ int messageTypeValue = buf->read<int32_t>();
+ int payloadLen = buf->read<int32_t>();
+ CELEBORN_CHECK_EQ(buf->remainingSize(), payloadLen);
+ CELEBORN_CHECK(MessageType_IsValid(messageTypeValue));
+ type_ = static_cast<MessageType>(messageTypeValue);
+ messageTypeValue_ = type_;
+ payload_ = buf->readToString(payloadLen);
+}
+
+std::unique_ptr<ReadOnlyByteBuffer> TransportMessage::toReadOnlyByteBuffer()
+ const {
+ int bufSize = payload_.size() + 4 + 4;
+ auto buffer = ByteBuffer::createWriteOnly(bufSize);
+ buffer->write<int>(messageTypeValue_);
+ buffer->write<int>(payload_.size());
+ buffer->writeFromString(payload_);
+ CELEBORN_CHECK_EQ(buffer->size(), bufSize);
+ return ByteBuffer::toReadOnly(std::move(buffer));
+}
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/TransportMessage.h
b/cpp/celeborn/protocol/TransportMessage.h
new file mode 100644
index 000000000..22a162eea
--- /dev/null
+++ b/cpp/celeborn/protocol/TransportMessage.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/io/Cursor.h>
+#include <string>
+
+#include "celeborn/memory/ByteBuffer.h"
+#include "celeborn/proto/TransportMessagesCpp.pb.h"
+
+namespace celeborn {
+namespace protocol {
+class TransportMessage {
+ public:
+ TransportMessage(MessageType type, std::string&& payload);
+
+ TransportMessage(std::unique_ptr<ReadOnlyByteBuffer> buf);
+
+ std::unique_ptr<ReadOnlyByteBuffer> toReadOnlyByteBuffer() const;
+
+ MessageType type() const {
+ return type_;
+ }
+
+ std::string payload() const {
+ return payload_;
+ }
+
+ private:
+ MessageType type_;
+ int messageTypeValue_;
+ std::string payload_;
+};
+} // namespace protocol
+} // namespace celeborn
diff --git a/cpp/celeborn/protocol/tests/CMakeLists.txt
b/cpp/celeborn/protocol/tests/CMakeLists.txt
index 8f7b5e2f2..f5a00db2a 100644
--- a/cpp/celeborn/protocol/tests/CMakeLists.txt
+++ b/cpp/celeborn/protocol/tests/CMakeLists.txt
@@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-add_executable(celeborn_protocol_test PartitionLocationTest.cpp)
+add_executable(
+ celeborn_protocol_test
+ PartitionLocationTest.cpp
+ TransportMessageTest.cpp)
add_test(NAME celeborn_protocol_test COMMAND celeborn_protocol_test)
diff --git a/cpp/celeborn/protocol/tests/TransportMessageTest.cpp
b/cpp/celeborn/protocol/tests/TransportMessageTest.cpp
new file mode 100644
index 000000000..c0f64c686
--- /dev/null
+++ b/cpp/celeborn/protocol/tests/TransportMessageTest.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "celeborn/proto/TransportMessagesCpp.pb.h"
+#include "celeborn/protocol/TransportMessage.h"
+
+using namespace celeborn;
+using namespace celeborn::protocol;
+
+TEST(TransportMessageTest, constructFromPayload) {
+ const std::string payload = "payload";
+ auto payloadToMove = payload;
+ auto messageType = 10;
+ auto transportMessage = std::make_unique<TransportMessage>(
+ static_cast<MessageType>(messageType), std::move(payloadToMove));
+
+ auto transportMessageBuffer = transportMessage->toReadOnlyByteBuffer();
+ EXPECT_EQ(
+ transportMessageBuffer->remainingSize(),
+ sizeof(int) * 2 + payload.size());
+ EXPECT_EQ(transportMessageBuffer->read<int>(), messageType);
+ EXPECT_EQ(transportMessageBuffer->read<int>(), payload.size());
+ EXPECT_EQ(transportMessageBuffer->readToString(), payload);
+}
+
+TEST(TransportMessageTest, constructFromReadOnlyBuffer) {
+ const std::string payload = "payload";
+ auto payloadToMove = payload;
+ auto messageType = 10;
+ auto contentMessage = std::make_unique<TransportMessage>(
+ static_cast<MessageType>(messageType), std::move(payloadToMove));
+ auto contentBuffer = contentMessage->toReadOnlyByteBuffer();
+
+ auto transportMessage =
+ std::make_unique<TransportMessage>(std::move(contentBuffer));
+ auto transportMessageBuffer = transportMessage->toReadOnlyByteBuffer();
+ EXPECT_EQ(
+ transportMessageBuffer->remainingSize(),
+ sizeof(int) * 2 + payload.size());
+ EXPECT_EQ(transportMessageBuffer->read<int>(), messageType);
+ EXPECT_EQ(transportMessageBuffer->read<int>(), payload.size());
+ EXPECT_EQ(transportMessageBuffer->readToString(), payload);
+}