This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1ed2abc6b [CELEBORN-2095][CIP-14] Support RegisterShuffle/Response in
cppClient
1ed2abc6b is described below
commit 1ed2abc6bff6d2db5ceec1bf6dd1d78f9bec166a
Author: HolyLow <[email protected]>
AuthorDate: Wed Aug 13 10:29:34 2025 +0800
[CELEBORN-2095][CIP-14] Support RegisterShuffle/Response in cppClient
### What changes were proposed in this pull request?
Support RegisterShuffle/Response messages in CppClient.
### Why are the changes needed?
To support the procedure of registering shuffle and accepting response in
CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes #3410 from
HolyLow/issue/celeborn-2095-support-registershuffle-response.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/protocol/ControlMessages.cpp | 115 +++++++++++++++------
cpp/celeborn/protocol/ControlMessages.h | 19 +++-
.../protocol/tests/ControlMessagesTest.cpp | 51 ++++++++-
3 files changed, 151 insertions(+), 34 deletions(-)
diff --git a/cpp/celeborn/protocol/ControlMessages.cpp
b/cpp/celeborn/protocol/ControlMessages.cpp
index 36a7023eb..6354b46ea 100644
--- a/cpp/celeborn/protocol/ControlMessages.cpp
+++ b/cpp/celeborn/protocol/ControlMessages.cpp
@@ -20,6 +20,84 @@
namespace celeborn {
namespace protocol {
+
+namespace {
+std::vector<std::unique_ptr<const PartitionLocation>>
+fromPbPackedPartitionLocationsPair(
+ const PbPackedPartitionLocationsPair& pbPackedPartitionLocationsPair) {
+ std::vector<std::unique_ptr<const PartitionLocation>> finalLocations;
+ std::vector<std::unique_ptr<PartitionLocation>> partialLocations;
+ int inputLocationSize = pbPackedPartitionLocationsPair.inputlocationsize();
+ auto& pbPackedPartitionLocations =
pbPackedPartitionLocationsPair.locations();
+ auto& pbIds = pbPackedPartitionLocations.ids();
+ for (int idx = 0; idx < pbIds.size(); idx++) {
+ partialLocations.push_back(
+ PartitionLocation::fromPackedPb(pbPackedPartitionLocations, idx));
+ }
+ for (int idx = 0; idx < inputLocationSize; idx++) {
+ auto replicaIdx = pbPackedPartitionLocationsPair.peerindexes(idx);
+ // Has peer.
+ if (replicaIdx != INT_MAX) {
+ CELEBORN_CHECK_GE(replicaIdx, inputLocationSize);
+ CELEBORN_CHECK_LT(replicaIdx, partialLocations.size());
+ auto location = std::move(partialLocations[idx]);
+ auto peerLocation = std::move(partialLocations[replicaIdx]);
+ // Make sure the location is primary and peer location is replica.
+ if (location->mode == PartitionLocation::Mode::REPLICA) {
+ std::swap(location, peerLocation);
+ }
+ CELEBORN_CHECK(location->mode == PartitionLocation::Mode::PRIMARY);
+ CELEBORN_CHECK(peerLocation->mode == PartitionLocation::Mode::REPLICA);
+ location->replicaPeer = std::move(peerLocation);
+ finalLocations.push_back(std::move(location));
+ }
+ // Has no peer.
+ else {
+ finalLocations.push_back(std::move(partialLocations[idx]));
+ }
+ }
+ return finalLocations;
+}
+
+} // namespace
+
+TransportMessage RegisterShuffle::toTransportMessage() const {
+ MessageType type = REGISTER_SHUFFLE;
+ PbRegisterShuffle pb;
+ pb.set_shuffleid(shuffleId);
+ pb.set_nummappers(numMappers);
+ pb.set_numpartitions(numPartitions);
+ std::string payload = pb.SerializeAsString();
+ return TransportMessage(type, std::move(payload));
+}
+
+std::unique_ptr<RegisterShuffleResponse>
+RegisterShuffleResponse::fromTransportMessage(
+ const TransportMessage& transportMessage) {
+ CELEBORN_CHECK(
+ transportMessage.type() == REGISTER_SHUFFLE_RESPONSE,
+ "transportMessageType mismatch");
+ auto payload = transportMessage.payload();
+ auto pbRegisterShuffleResponse =
utils::parseProto<PbRegisterShuffleResponse>(
+ reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+ auto response = std::make_unique<RegisterShuffleResponse>();
+ response->status = toStatusCode(pbRegisterShuffleResponse->status());
+
+ // Legacy mode is deprecated.
+ auto& pbPartitionLocations = pbRegisterShuffleResponse->partitionlocations();
+ CELEBORN_CHECK_EQ(
+ pbPartitionLocations.size(),
+ 0,
+ "legacy PartitionLocation pb is deprecated");
+
+ // Packed mode: must use packedPartitionLocations.
+ const auto& pbPackedPartitionLocationsPair =
+ pbRegisterShuffleResponse->packedpartitionlocationspair();
+ response->partitionLocations = std::move(
+ fromPbPackedPartitionLocationsPair(pbPackedPartitionLocationsPair));
+ return std::move(response);
+}
+
TransportMessage MapperEnd::toTransportMessage() const {
MessageType type = MAPPER_END;
PbMapperEnd pb;
@@ -74,37 +152,12 @@ GetReducerFileGroupResponse::fromTransportMessage(
0,
"legecy PartitionLocation pb is deprecated");
// Packed mode: must use packedPartitionLocations.
- auto& pbPackedPartitionLocationsPair = kv.second.partitionlocationspair();
- int inputLocationSize = pbPackedPartitionLocationsPair.inputlocationsize();
- auto& pbPackedPartitionLocations =
- pbPackedPartitionLocationsPair.locations();
- std::vector<std::unique_ptr<PartitionLocation>> partialLocations;
- auto& pbIds = pbPackedPartitionLocations.ids();
- for (int idx = 0; idx < pbIds.size(); idx++) {
- partialLocations.push_back(
- PartitionLocation::fromPackedPb(pbPackedPartitionLocations, idx));
- }
- for (int idx = 0; idx < inputLocationSize; idx++) {
- auto replicaIdx = pbPackedPartitionLocationsPair.peerindexes(idx);
- // has peer
- if (replicaIdx != INT_MAX) {
- CELEBORN_CHECK_GE(replicaIdx, inputLocationSize);
- CELEBORN_CHECK_LT(replicaIdx, partialLocations.size());
- auto location = std::move(partialLocations[idx]);
- auto peerLocation = std::move(partialLocations[replicaIdx]);
- // make sure the location is primary and peer location is replica
- if (location->mode == PartitionLocation::Mode::REPLICA) {
- std::swap(location, peerLocation);
- }
- CELEBORN_CHECK(location->mode == PartitionLocation::Mode::PRIMARY);
- CELEBORN_CHECK(peerLocation->mode == PartitionLocation::Mode::REPLICA);
- location->replicaPeer = std::move(peerLocation);
- fileGroup.insert(std::move(location));
- }
- // has no peer
- else {
- fileGroup.insert(std::move(partialLocations[idx]));
- }
+ const auto& pbPackedPartitionLocationsPair =
+ kv.second.partitionlocationspair();
+ auto locations =
+ fromPbPackedPartitionLocationsPair(pbPackedPartitionLocationsPair);
+ for (auto& location : locations) {
+ fileGroup.insert(std::move(location));
}
}
auto attempts = pbGetReducerFileGroupResponse->attempts();
diff --git a/cpp/celeborn/protocol/ControlMessages.h
b/cpp/celeborn/protocol/ControlMessages.h
index a7bddc41c..b3e360a0e 100644
--- a/cpp/celeborn/protocol/ControlMessages.h
+++ b/cpp/celeborn/protocol/ControlMessages.h
@@ -26,8 +26,25 @@
namespace celeborn {
namespace protocol {
+
+struct RegisterShuffle {
+ int shuffleId;
+ int numMappers;
+ int numPartitions;
+
+ TransportMessage toTransportMessage() const;
+};
+
+struct RegisterShuffleResponse {
+ StatusCode status;
+ std::vector<std::unique_ptr<const PartitionLocation>> partitionLocations;
+
+ static std::unique_ptr<RegisterShuffleResponse> fromTransportMessage(
+ const TransportMessage& transportMessage);
+};
+
struct MapperEnd {
- long shuffleId;
+ int shuffleId;
int mapId;
int attemptId;
int numMappers;
diff --git a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
index d1f7dce41..0e4688072 100644
--- a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
+++ b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
@@ -62,6 +62,55 @@ void verifyUnpackedPartitionLocation(
}
} // namespace
+TEST(ControlMessagesTest, registerShuffle) {
+ auto registerShuffle = std::make_unique<RegisterShuffle>();
+ registerShuffle->shuffleId = 1000;
+ registerShuffle->numMappers = 1001;
+ registerShuffle->numPartitions = 1002;
+
+ auto transportMessage = registerShuffle->toTransportMessage();
+ EXPECT_EQ(transportMessage.type(), REGISTER_SHUFFLE);
+ auto payload = transportMessage.payload();
+ auto pbRegisterShuffle = utils::parseProto<PbRegisterShuffle>(
+ reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+ EXPECT_EQ(pbRegisterShuffle->shuffleid(), registerShuffle->shuffleId);
+ EXPECT_EQ(pbRegisterShuffle->nummappers(), registerShuffle->numMappers);
+ EXPECT_EQ(pbRegisterShuffle->numpartitions(),
registerShuffle->numPartitions);
+}
+
+TEST(ControlMessagesTest, registerShuffleResponse) {
+ const int statusCodeId = 1;
+ PbRegisterShuffleResponse pbRegisterShuffleResponse;
+ pbRegisterShuffleResponse.set_status(statusCodeId);
+ auto pbPackedPartitionLocationsPair =
+ pbRegisterShuffleResponse.mutable_packedpartitionlocationspair();
+ auto pbPackedPartitionLocations =
+ pbPackedPartitionLocationsPair->mutable_locations();
+ // Has one inputLocation, with offset 0.
+ pbPackedPartitionLocationsPair->set_inputlocationsize(1);
+ // The peerIndex 1 is replica.
+ pbPackedPartitionLocationsPair->add_peerindexes(1);
+ // Add the two partitionLocations, one is primary and the other is replica.
+ generatePackedPartitionLocationPb(
+ *pbPackedPartitionLocations, 0, PartitionLocation::Mode::PRIMARY);
+ generatePackedPartitionLocationPb(
+ *pbPackedPartitionLocations, 1, PartitionLocation::Mode::REPLICA);
+
+ TransportMessage transportMessage(
+ REGISTER_SHUFFLE_RESPONSE,
pbRegisterShuffleResponse.SerializeAsString());
+ auto registerShuffleResponse =
+ RegisterShuffleResponse::fromTransportMessage(transportMessage);
+ EXPECT_EQ(registerShuffleResponse->status, statusCodeId);
+ const auto& partitionLocations = registerShuffleResponse->partitionLocations;
+ EXPECT_EQ(partitionLocations.size(), 1);
+ auto primaryPartitionLocation = partitionLocations.begin()->get();
+ verifyUnpackedPartitionLocation(primaryPartitionLocation);
+ EXPECT_EQ(primaryPartitionLocation->mode, PartitionLocation::Mode::PRIMARY);
+ auto replicaPartitionLocation = primaryPartitionLocation->replicaPeer.get();
+ verifyUnpackedPartitionLocation(replicaPartitionLocation);
+ EXPECT_EQ(replicaPartitionLocation->mode, PartitionLocation::Mode::REPLICA);
+}
+
TEST(ControlMessagesTest, mapperEnd) {
auto mapperEnd = std::make_unique<MapperEnd>();
mapperEnd->shuffleId = 1000;
@@ -93,8 +142,6 @@ TEST(ControlMessagesTest, mapperEndResponse) {
EXPECT_EQ(mapperEndResponse->status, 1);
}
-// TEST MapperEnd/Response
-
TEST(ControlMessagesTest, getReducerFileGroup) {
auto getReducerFileGroup = std::make_unique<GetReducerFileGroup>();
getReducerFileGroup->shuffleId = 1000;