SteNicholas commented on code in PR #3553:
URL: https://github.com/apache/celeborn/pull/3553#discussion_r2596890169


##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -143,6 +338,343 @@ bool ShuffleClientImpl::cleanupShuffle(int shuffleId) {
   return true;
 }
 
+std::shared_ptr<PushState> ShuffleClientImpl::getPushState(
+    const std::string& mapKey) {
+  return pushStates_.computeIfAbsent(
+      mapKey, [&]() { return std::make_shared<PushState>(*conf_); });
+}
+
+void ShuffleClientImpl::initReviveManagerLocked() {
+  if (!reviveManager_) {
+    std::string uniqueName = appUniqueId_;
+    uniqueName += std::to_string(utils::currentTimeNanos());
+    reviveManager_ =
+        ReviveManager::create(uniqueName, *conf_, weak_from_this());
+  }
+}
+
+void ShuffleClientImpl::registerShuffle(
+    int shuffleId,
+    int numMappers,
+    int numPartitions) {
+  auto shuffleMutex = shuffleMutexes_.computeIfAbsent(
+      shuffleId, []() { return std::make_shared<std::mutex>(); });
+  // RegisterShuffle might be issued concurrently, we only allow one issue
+  // for each shuffleId.
+  std::lock_guard<std::mutex> lock(*shuffleMutex);
+  if (partitionLocationMaps_.containsKey(shuffleId)) {
+    return;
+  }
+  CELEBORN_CHECK(
+      lifecycleManagerRef_, "lifecycleManagerRef_ is not initialized");
+  const int maxRetries = conf_->clientRegisterShuffleMaxRetries();
+  int numRetries = 1;
+  for (; numRetries <= maxRetries; numRetries++) {
+    try {
+      // Send the query request to lifecycleManager.
+      auto registerShuffleResponse = lifecycleManagerRef_->askSync<
+          protocol::RegisterShuffle,
+          protocol::RegisterShuffleResponse>(
+          protocol::RegisterShuffle{shuffleId, numMappers, numPartitions},
+          conf_->clientRpcRegisterShuffleRpcAskTimeout());
+
+      switch (registerShuffleResponse->status) {
+        case protocol::StatusCode::SUCCESS: {
+          VLOG(1) << "success to registerShuffle, shuffleId " << shuffleId
+                  << " numMappers " << numMappers << " numPartitions "
+                  << numPartitions;
+          auto partitionLocationMap = 
std::make_shared<utils::ConcurrentHashMap<
+              int,
+              std::shared_ptr<const protocol::PartitionLocation>>>();
+          auto& partitionLocations =
+              registerShuffleResponse->partitionLocations;
+          for (auto i = 0; i < partitionLocations.size(); i++) {
+            auto id = partitionLocations[i]->id;
+            partitionLocationMap->set(id, std::move(partitionLocations[i]));
+          }
+          partitionLocationMaps_.set(
+              shuffleId, std::move(partitionLocationMap));
+          return;
+        }
+        default: {
+          LOG(ERROR)
+              << "LifecycleManager request slots return protocol::StatusCode "
+              << registerShuffleResponse->status << " , shuffleId " << 
shuffleId
+              << " NumMappers " << numMappers << " numPartitions "

Review Comment:
   ```suggestion
                 << " numMappers " << numMappers << " numPartitions "
   ```



##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -21,19 +21,46 @@
 
 namespace celeborn {
 namespace client {
+
+ShuffleClientEndpoint::ShuffleClientEndpoint(
+    const std::shared_ptr<const conf::CelebornConf>& conf)
+    : conf_(conf),
+      pushDataRetryPool_(std::make_shared<folly::IOThreadPoolExecutor>(
+          conf_->clientPushRetryThreads(),
+          std::make_shared<folly::NamedThreadFactory>(
+              "celeborn-retry-pushdata"))),

Review Comment:
   What's the pattern of thread name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to