This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9bae3fbd5 [CELEBORN-1915][CIP-14] Add reader's ShuffleClient to 
cppClient
9bae3fbd5 is described below

commit 9bae3fbd5e889315e77d02eb669461fecee786de
Author: HolyLow <[email protected]>
AuthorDate: Tue Mar 25 17:54:34 2025 +0800

    [CELEBORN-1915][CIP-14] Add reader's ShuffleClient to cppClient
    
    ### What changes were proposed in this pull request?
    This PR adds reader end's ShuffleClient to cppClient.
    
    ### Why are the changes needed?
    ShuffleClient is the user interface for cppClient usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation.
    
    Closes #3156 from 
HolyLow/issue/celeborn-1915-add-reader-shuffle-client-to-cppclient.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 cpp/celeborn/client/CMakeLists.txt    |   3 +-
 cpp/celeborn/client/ShuffleClient.cpp | 141 ++++++++++++++++++++++++++++++++++
 cpp/celeborn/client/ShuffleClient.h   |  86 +++++++++++++++++++++
 cpp/celeborn/utils/CelebornUtils.cpp  |   4 +
 cpp/celeborn/utils/CelebornUtils.h    |  12 +++
 5 files changed, 245 insertions(+), 1 deletion(-)

diff --git a/cpp/celeborn/client/CMakeLists.txt 
b/cpp/celeborn/client/CMakeLists.txt
index f9edfe949..2491b7d7f 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -15,7 +15,8 @@
 add_library(
         client
         reader/WorkerPartitionReader.cpp
-        reader/CelebornInputStream.cpp)
+        reader/CelebornInputStream.cpp
+        ShuffleClient.cpp)
 
 target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
 
diff --git a/cpp/celeborn/client/ShuffleClient.cpp 
b/cpp/celeborn/client/ShuffleClient.cpp
new file mode 100644
index 000000000..35e593d1b
--- /dev/null
+++ b/cpp/celeborn/client/ShuffleClient.cpp
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/client/ShuffleClient.h"
+
+#include "celeborn/utils/CelebornUtils.h"
+
+namespace celeborn {
+namespace client {
+ShuffleClientImpl::ShuffleClientImpl(
+    const std::string& appUniqueId,
+    const std::shared_ptr<const conf::CelebornConf>& conf,
+    const std::shared_ptr<network::TransportClientFactory>& clientFactory)
+    : appUniqueId_(appUniqueId), conf_(conf), clientFactory_(clientFactory) {}
+
+void ShuffleClientImpl::setupLifecycleManagerRef(std::string& host, int port) {
+  auto managerClient = clientFactory_->createClient(host, port);
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    lifecycleManagerRef_ = std::make_shared<network::NettyRpcEndpointRef>(
+        "LifecycleManagerEndpoint", "dummy", 0, host, port, managerClient);
+  }
+}
+
+void ShuffleClientImpl::setupLifecycleManagerRef(
+    std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  lifecycleManagerRef_ = lifecycleManagerRef;
+}
+
+std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
+    int shuffleId,
+    int partitionId,
+    int attemptNumber,
+    int startMapIndex,
+    int endMapIndex) {
+  const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId);
+  std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId);
+  std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations;
+  if (!reducerFileGroupInfo.fileGroups.empty() &&
+      reducerFileGroupInfo.fileGroups.count(partitionId)) {
+    locations = std::move(
+        utils::toVector(
+            reducerFileGroupInfo.fileGroups.find(partitionId)->second));
+  }
+  return std::make_unique<CelebornInputStream>(
+      shuffleKey,
+      conf_,
+      clientFactory_,
+      std::move(locations),
+      reducerFileGroupInfo.attempts,
+      attemptNumber,
+      startMapIndex,
+      endMapIndex);
+}
+
+void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
+  CELEBORN_CHECK(
+      lifecycleManagerRef_, "lifecycleManagerRef_ is not initialized");
+  // Send the query request to lifecycleManager.
+  auto reducerFileGroupInfo = lifecycleManagerRef_->askSync(
+      protocol::GetReducerFileGroup{shuffleId},
+      conf_->clientRpcGetReducerFileGroupRpcAskTimeout());
+
+  switch (reducerFileGroupInfo->status) {
+    case protocol::SUCCESS: {
+      VLOG(1) << "success to get reducerFileGroupInfo, shuffleId " << 
shuffleId;
+      std::lock_guard<std::mutex> lock(mutex_);
+      if (reducerFileGroupInfos_.count(shuffleId) > 0) {
+        VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId
+                << " already exists, ignored";
+        return;
+      }
+      reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo);
+      return;
+    }
+    case protocol::SHUFFLE_NOT_REGISTERED: {
+      // We cannot treat this as a failure. It indicates this is an empty
+      // shuffle.
+      LOG(WARNING) << "shuffleId " << shuffleId
+                   << " is not registered when get reducerFileGroupInfo";
+      std::lock_guard<std::mutex> lock(mutex_);
+      if (reducerFileGroupInfos_.count(shuffleId) > 0) {
+        VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId
+                << " already exists, ignored";
+        return;
+      }
+      reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo);
+      return;
+    }
+    case protocol::STAGE_END_TIME_OUT:
+    case protocol::SHUFFLE_DATA_LOST: {
+      LOG(ERROR) << "shuffleId " << shuffleId
+                 << " failed getReducerFileGroupInfo with code "
+                 << reducerFileGroupInfo->status;
+      CELEBORN_FAIL("failed protocol::GetReducerFileGroupResponse code");
+    }
+    default: {
+      CELEBORN_FAIL("undefined protocol::GetReducerFileGroupResponse code");
+    }
+  }
+}
+
+bool ShuffleClientImpl::cleanupShuffle(int shuffleId) {
+  std::lock_guard<std::mutex> lock(mutex_);
+  reducerFileGroupInfos_.erase(shuffleId);
+  return true;
+}
+
+protocol::GetReducerFileGroupResponse&
+ShuffleClientImpl::getReducerFileGroupInfo(int shuffleId) {
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    auto iter = reducerFileGroupInfos_.find(shuffleId);
+    if (iter != reducerFileGroupInfos_.end()) {
+      return *iter->second;
+    }
+  }
+
+  updateReducerFileGroup(shuffleId);
+  {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return *reducerFileGroupInfos_[shuffleId];
+  }
+}
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/ShuffleClient.h 
b/cpp/celeborn/client/ShuffleClient.h
new file mode 100644
index 000000000..d66d1649d
--- /dev/null
+++ b/cpp/celeborn/client/ShuffleClient.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "celeborn/client/reader/CelebornInputStream.h"
+#include "celeborn/network/NettyRpcEndpointRef.h"
+
+namespace celeborn {
+namespace client {
+class ShuffleClient {
+ public:
+  virtual void setupLifecycleManagerRef(std::string& host, int port) = 0;
+
+  virtual void setupLifecycleManagerRef(
+      std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef) = 0;
+
+  virtual void updateReducerFileGroup(int shuffleId) = 0;
+
+  virtual std::unique_ptr<CelebornInputStream> readPartition(
+      int shuffleId,
+      int partitionId,
+      int attemptNumber,
+      int startMapIndex,
+      int endMapIndex) = 0;
+
+  virtual bool cleanupShuffle(int shuffleId) = 0;
+
+  virtual void shutdown() = 0;
+};
+
+class ShuffleClientImpl : public ShuffleClient {
+ public:
+  ShuffleClientImpl(
+      const std::string& appUniqueId,
+      const std::shared_ptr<const conf::CelebornConf>& conf,
+      const std::shared_ptr<network::TransportClientFactory>& clientFactory);
+
+  void setupLifecycleManagerRef(std::string& host, int port) override;
+
+  void setupLifecycleManagerRef(
+      std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef)
+      override;
+
+  std::unique_ptr<CelebornInputStream> readPartition(
+      int shuffleId,
+      int partitionId,
+      int attemptNumber,
+      int startMapIndex,
+      int endMapIndex) override;
+
+  void updateReducerFileGroup(int shuffleId) override;
+
+  bool cleanupShuffle(int shuffleId) override;
+
+  void shutdown() override {}
+
+ private:
+  protocol::GetReducerFileGroupResponse& getReducerFileGroupInfo(int 
shuffleId);
+
+  const std::string appUniqueId_;
+  std::shared_ptr<const conf::CelebornConf> conf_;
+  std::shared_ptr<network::NettyRpcEndpointRef> lifecycleManagerRef_;
+  std::shared_ptr<network::TransportClientFactory> clientFactory_;
+  std::mutex mutex_;
+  std::unordered_map<
+      long,
+      std::unique_ptr<protocol::GetReducerFileGroupResponse>>
+      reducerFileGroupInfos_;
+};
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/utils/CelebornUtils.cpp 
b/cpp/celeborn/utils/CelebornUtils.cpp
index 862dad7cc..24340cb79 100644
--- a/cpp/celeborn/utils/CelebornUtils.cpp
+++ b/cpp/celeborn/utils/CelebornUtils.cpp
@@ -19,6 +19,10 @@
 
 namespace celeborn {
 namespace utils {
+std::string makeShuffleKey(const std::string& appId, const int shuffleId) {
+  return appId + "-" + std::to_string(shuffleId);
+}
+
 void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg) {
   buffer.write<short>(msg.size());
   buffer.writeFromString(msg);
diff --git a/cpp/celeborn/utils/CelebornUtils.h 
b/cpp/celeborn/utils/CelebornUtils.h
index 83e899533..1fab9352d 100644
--- a/cpp/celeborn/utils/CelebornUtils.h
+++ b/cpp/celeborn/utils/CelebornUtils.h
@@ -35,6 +35,18 @@ namespace utils {
 #define CELEBORN_SHUTDOWN_LOG(severity) \
   LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX
 
+template <typename T>
+std::vector<T> toVector(const std::set<T>& in) {
+  std::vector<T> out{};
+  out.reserve(in.size());
+  for (const auto& i : in) {
+    out.emplace_back(i);
+  }
+  return std::move(out);
+}
+
+std::string makeShuffleKey(const std::string& appId, int shuffleId);
+
 void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg);
 
 void writeRpcAddress(

Reply via email to