This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ed2abc6b [CELEBORN-2095][CIP-14] Support RegisterShuffle/Response in 
cppClient
1ed2abc6b is described below

commit 1ed2abc6bff6d2db5ceec1bf6dd1d78f9bec166a
Author: HolyLow <[email protected]>
AuthorDate: Wed Aug 13 10:29:34 2025 +0800

    [CELEBORN-2095][CIP-14] Support RegisterShuffle/Response in cppClient
    
    ### What changes were proposed in this pull request?
    Support RegisterShuffle/Response messages in CppClient.
    
    ### Why are the changes needed?
    To support the procedure of registering shuffle and accepting response in 
CppClient.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation and UTs.
    
    Closes #3410 from 
HolyLow/issue/celeborn-2095-support-registershuffle-response.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 cpp/celeborn/protocol/ControlMessages.cpp          | 115 +++++++++++++++------
 cpp/celeborn/protocol/ControlMessages.h            |  19 +++-
 .../protocol/tests/ControlMessagesTest.cpp         |  51 ++++++++-
 3 files changed, 151 insertions(+), 34 deletions(-)

diff --git a/cpp/celeborn/protocol/ControlMessages.cpp 
b/cpp/celeborn/protocol/ControlMessages.cpp
index 36a7023eb..6354b46ea 100644
--- a/cpp/celeborn/protocol/ControlMessages.cpp
+++ b/cpp/celeborn/protocol/ControlMessages.cpp
@@ -20,6 +20,84 @@
 
 namespace celeborn {
 namespace protocol {
+
+namespace {
+std::vector<std::unique_ptr<const PartitionLocation>>
+fromPbPackedPartitionLocationsPair(
+    const PbPackedPartitionLocationsPair& pbPackedPartitionLocationsPair) {
+  std::vector<std::unique_ptr<const PartitionLocation>> finalLocations;
+  std::vector<std::unique_ptr<PartitionLocation>> partialLocations;
+  int inputLocationSize = pbPackedPartitionLocationsPair.inputlocationsize();
+  auto& pbPackedPartitionLocations = 
pbPackedPartitionLocationsPair.locations();
+  auto& pbIds = pbPackedPartitionLocations.ids();
+  for (int idx = 0; idx < pbIds.size(); idx++) {
+    partialLocations.push_back(
+        PartitionLocation::fromPackedPb(pbPackedPartitionLocations, idx));
+  }
+  for (int idx = 0; idx < inputLocationSize; idx++) {
+    auto replicaIdx = pbPackedPartitionLocationsPair.peerindexes(idx);
+    // Has peer.
+    if (replicaIdx != INT_MAX) {
+      CELEBORN_CHECK_GE(replicaIdx, inputLocationSize);
+      CELEBORN_CHECK_LT(replicaIdx, partialLocations.size());
+      auto location = std::move(partialLocations[idx]);
+      auto peerLocation = std::move(partialLocations[replicaIdx]);
+      // Make sure the location is primary and peer location is replica.
+      if (location->mode == PartitionLocation::Mode::REPLICA) {
+        std::swap(location, peerLocation);
+      }
+      CELEBORN_CHECK(location->mode == PartitionLocation::Mode::PRIMARY);
+      CELEBORN_CHECK(peerLocation->mode == PartitionLocation::Mode::REPLICA);
+      location->replicaPeer = std::move(peerLocation);
+      finalLocations.push_back(std::move(location));
+    }
+    // Has no peer.
+    else {
+      finalLocations.push_back(std::move(partialLocations[idx]));
+    }
+  }
+  return finalLocations;
+}
+
+} // namespace
+
+TransportMessage RegisterShuffle::toTransportMessage() const {
+  MessageType type = REGISTER_SHUFFLE;
+  PbRegisterShuffle pb;
+  pb.set_shuffleid(shuffleId);
+  pb.set_nummappers(numMappers);
+  pb.set_numpartitions(numPartitions);
+  std::string payload = pb.SerializeAsString();
+  return TransportMessage(type, std::move(payload));
+}
+
+std::unique_ptr<RegisterShuffleResponse>
+RegisterShuffleResponse::fromTransportMessage(
+    const TransportMessage& transportMessage) {
+  CELEBORN_CHECK(
+      transportMessage.type() == REGISTER_SHUFFLE_RESPONSE,
+      "transportMessageType mismatch");
+  auto payload = transportMessage.payload();
+  auto pbRegisterShuffleResponse = 
utils::parseProto<PbRegisterShuffleResponse>(
+      reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+  auto response = std::make_unique<RegisterShuffleResponse>();
+  response->status = toStatusCode(pbRegisterShuffleResponse->status());
+
+  // Legacy mode is deprecated.
+  auto& pbPartitionLocations = pbRegisterShuffleResponse->partitionlocations();
+  CELEBORN_CHECK_EQ(
+      pbPartitionLocations.size(),
+      0,
+      "legacy PartitionLocation pb is deprecated");
+
+  // Packed mode: must use packedPartitionLocations.
+  const auto& pbPackedPartitionLocationsPair =
+      pbRegisterShuffleResponse->packedpartitionlocationspair();
+  response->partitionLocations = std::move(
+      fromPbPackedPartitionLocationsPair(pbPackedPartitionLocationsPair));
+  return std::move(response);
+}
+
 TransportMessage MapperEnd::toTransportMessage() const {
   MessageType type = MAPPER_END;
   PbMapperEnd pb;
@@ -74,37 +152,12 @@ GetReducerFileGroupResponse::fromTransportMessage(
         0,
         "legecy PartitionLocation pb is deprecated");
     // Packed mode: must use packedPartitionLocations.
-    auto& pbPackedPartitionLocationsPair = kv.second.partitionlocationspair();
-    int inputLocationSize = pbPackedPartitionLocationsPair.inputlocationsize();
-    auto& pbPackedPartitionLocations =
-        pbPackedPartitionLocationsPair.locations();
-    std::vector<std::unique_ptr<PartitionLocation>> partialLocations;
-    auto& pbIds = pbPackedPartitionLocations.ids();
-    for (int idx = 0; idx < pbIds.size(); idx++) {
-      partialLocations.push_back(
-          PartitionLocation::fromPackedPb(pbPackedPartitionLocations, idx));
-    }
-    for (int idx = 0; idx < inputLocationSize; idx++) {
-      auto replicaIdx = pbPackedPartitionLocationsPair.peerindexes(idx);
-      // has peer
-      if (replicaIdx != INT_MAX) {
-        CELEBORN_CHECK_GE(replicaIdx, inputLocationSize);
-        CELEBORN_CHECK_LT(replicaIdx, partialLocations.size());
-        auto location = std::move(partialLocations[idx]);
-        auto peerLocation = std::move(partialLocations[replicaIdx]);
-        // make sure the location is primary and peer location is replica
-        if (location->mode == PartitionLocation::Mode::REPLICA) {
-          std::swap(location, peerLocation);
-        }
-        CELEBORN_CHECK(location->mode == PartitionLocation::Mode::PRIMARY);
-        CELEBORN_CHECK(peerLocation->mode == PartitionLocation::Mode::REPLICA);
-        location->replicaPeer = std::move(peerLocation);
-        fileGroup.insert(std::move(location));
-      }
-      // has no peer
-      else {
-        fileGroup.insert(std::move(partialLocations[idx]));
-      }
+    const auto& pbPackedPartitionLocationsPair =
+        kv.second.partitionlocationspair();
+    auto locations =
+        fromPbPackedPartitionLocationsPair(pbPackedPartitionLocationsPair);
+    for (auto& location : locations) {
+      fileGroup.insert(std::move(location));
     }
   }
   auto attempts = pbGetReducerFileGroupResponse->attempts();
diff --git a/cpp/celeborn/protocol/ControlMessages.h 
b/cpp/celeborn/protocol/ControlMessages.h
index a7bddc41c..b3e360a0e 100644
--- a/cpp/celeborn/protocol/ControlMessages.h
+++ b/cpp/celeborn/protocol/ControlMessages.h
@@ -26,8 +26,25 @@
 
 namespace celeborn {
 namespace protocol {
+
+struct RegisterShuffle {
+  int shuffleId;
+  int numMappers;
+  int numPartitions;
+
+  TransportMessage toTransportMessage() const;
+};
+
+struct RegisterShuffleResponse {
+  StatusCode status;
+  std::vector<std::unique_ptr<const PartitionLocation>> partitionLocations;
+
+  static std::unique_ptr<RegisterShuffleResponse> fromTransportMessage(
+      const TransportMessage& transportMessage);
+};
+
 struct MapperEnd {
-  long shuffleId;
+  int shuffleId;
   int mapId;
   int attemptId;
   int numMappers;
diff --git a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp 
b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
index d1f7dce41..0e4688072 100644
--- a/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
+++ b/cpp/celeborn/protocol/tests/ControlMessagesTest.cpp
@@ -62,6 +62,55 @@ void verifyUnpackedPartitionLocation(
 }
 } // namespace
 
+TEST(ControlMessagesTest, registerShuffle) {
+  auto registerShuffle = std::make_unique<RegisterShuffle>();
+  registerShuffle->shuffleId = 1000;
+  registerShuffle->numMappers = 1001;
+  registerShuffle->numPartitions = 1002;
+
+  auto transportMessage = registerShuffle->toTransportMessage();
+  EXPECT_EQ(transportMessage.type(), REGISTER_SHUFFLE);
+  auto payload = transportMessage.payload();
+  auto pbRegisterShuffle = utils::parseProto<PbRegisterShuffle>(
+      reinterpret_cast<const uint8_t*>(payload.c_str()), payload.size());
+  EXPECT_EQ(pbRegisterShuffle->shuffleid(), registerShuffle->shuffleId);
+  EXPECT_EQ(pbRegisterShuffle->nummappers(), registerShuffle->numMappers);
+  EXPECT_EQ(pbRegisterShuffle->numpartitions(), 
registerShuffle->numPartitions);
+}
+
+TEST(ControlMessagesTest, registerShuffleResponse) {
+  const int statusCodeId = 1;
+  PbRegisterShuffleResponse pbRegisterShuffleResponse;
+  pbRegisterShuffleResponse.set_status(statusCodeId);
+  auto pbPackedPartitionLocationsPair =
+      pbRegisterShuffleResponse.mutable_packedpartitionlocationspair();
+  auto pbPackedPartitionLocations =
+      pbPackedPartitionLocationsPair->mutable_locations();
+  // Has one inputLocation, with offset 0.
+  pbPackedPartitionLocationsPair->set_inputlocationsize(1);
+  // The peerIndex 1 is replica.
+  pbPackedPartitionLocationsPair->add_peerindexes(1);
+  // Add the two partitionLocations, one is primary and the other is replica.
+  generatePackedPartitionLocationPb(
+      *pbPackedPartitionLocations, 0, PartitionLocation::Mode::PRIMARY);
+  generatePackedPartitionLocationPb(
+      *pbPackedPartitionLocations, 1, PartitionLocation::Mode::REPLICA);
+
+  TransportMessage transportMessage(
+      REGISTER_SHUFFLE_RESPONSE, 
pbRegisterShuffleResponse.SerializeAsString());
+  auto registerShuffleResponse =
+      RegisterShuffleResponse::fromTransportMessage(transportMessage);
+  EXPECT_EQ(registerShuffleResponse->status, statusCodeId);
+  const auto& partitionLocations = registerShuffleResponse->partitionLocations;
+  EXPECT_EQ(partitionLocations.size(), 1);
+  auto primaryPartitionLocation = partitionLocations.begin()->get();
+  verifyUnpackedPartitionLocation(primaryPartitionLocation);
+  EXPECT_EQ(primaryPartitionLocation->mode, PartitionLocation::Mode::PRIMARY);
+  auto replicaPartitionLocation = primaryPartitionLocation->replicaPeer.get();
+  verifyUnpackedPartitionLocation(replicaPartitionLocation);
+  EXPECT_EQ(replicaPartitionLocation->mode, PartitionLocation::Mode::REPLICA);
+}
+
 TEST(ControlMessagesTest, mapperEnd) {
   auto mapperEnd = std::make_unique<MapperEnd>();
   mapperEnd->shuffleId = 1000;
@@ -93,8 +142,6 @@ TEST(ControlMessagesTest, mapperEndResponse) {
   EXPECT_EQ(mapperEndResponse->status, 1);
 }
 
-// TEST MapperEnd/Response
-
 TEST(ControlMessagesTest, getReducerFileGroup) {
   auto getReducerFileGroup = std::make_unique<GetReducerFileGroup>();
   getReducerFileGroup->shuffleId = 1000;

Reply via email to