This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9bae3fbd5 [CELEBORN-1915][CIP-14] Add reader's ShuffleClient to
cppClient
9bae3fbd5 is described below
commit 9bae3fbd5e889315e77d02eb669461fecee786de
Author: HolyLow <[email protected]>
AuthorDate: Tue Mar 25 17:54:34 2025 +0800
[CELEBORN-1915][CIP-14] Add reader's ShuffleClient to cppClient
### What changes were proposed in this pull request?
This PR adds reader end's ShuffleClient to cppClient.
### Why are the changes needed?
ShuffleClient is the user interface for cppClient usage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation.
Closes #3156 from
HolyLow/issue/celeborn-1915-add-reader-shuffle-client-to-cppclient.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/client/CMakeLists.txt | 3 +-
cpp/celeborn/client/ShuffleClient.cpp | 141 ++++++++++++++++++++++++++++++++++
cpp/celeborn/client/ShuffleClient.h | 86 +++++++++++++++++++++
cpp/celeborn/utils/CelebornUtils.cpp | 4 +
cpp/celeborn/utils/CelebornUtils.h | 12 +++
5 files changed, 245 insertions(+), 1 deletion(-)
diff --git a/cpp/celeborn/client/CMakeLists.txt
b/cpp/celeborn/client/CMakeLists.txt
index f9edfe949..2491b7d7f 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -15,7 +15,8 @@
add_library(
client
reader/WorkerPartitionReader.cpp
- reader/CelebornInputStream.cpp)
+ reader/CelebornInputStream.cpp
+ ShuffleClient.cpp)
target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
diff --git a/cpp/celeborn/client/ShuffleClient.cpp
b/cpp/celeborn/client/ShuffleClient.cpp
new file mode 100644
index 000000000..35e593d1b
--- /dev/null
+++ b/cpp/celeborn/client/ShuffleClient.cpp
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/client/ShuffleClient.h"
+
+#include "celeborn/utils/CelebornUtils.h"
+
+namespace celeborn {
+namespace client {
+ShuffleClientImpl::ShuffleClientImpl(
+ const std::string& appUniqueId,
+ const std::shared_ptr<const conf::CelebornConf>& conf,
+ const std::shared_ptr<network::TransportClientFactory>& clientFactory)
+ : appUniqueId_(appUniqueId), conf_(conf), clientFactory_(clientFactory) {}
+
+void ShuffleClientImpl::setupLifecycleManagerRef(std::string& host, int port) {
+ auto managerClient = clientFactory_->createClient(host, port);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ lifecycleManagerRef_ = std::make_shared<network::NettyRpcEndpointRef>(
+ "LifecycleManagerEndpoint", "dummy", 0, host, port, managerClient);
+ }
+}
+
+void ShuffleClientImpl::setupLifecycleManagerRef(
+ std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ lifecycleManagerRef_ = lifecycleManagerRef;
+}
+
+std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex) {
+ const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId);
+ std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId);
+ std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations;
+ if (!reducerFileGroupInfo.fileGroups.empty() &&
+ reducerFileGroupInfo.fileGroups.count(partitionId)) {
+ locations = std::move(
+ utils::toVector(
+ reducerFileGroupInfo.fileGroups.find(partitionId)->second));
+ }
+ return std::make_unique<CelebornInputStream>(
+ shuffleKey,
+ conf_,
+ clientFactory_,
+ std::move(locations),
+ reducerFileGroupInfo.attempts,
+ attemptNumber,
+ startMapIndex,
+ endMapIndex);
+}
+
+void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
+ CELEBORN_CHECK(
+ lifecycleManagerRef_, "lifecycleManagerRef_ is not initialized");
+ // Send the query request to lifecycleManager.
+ auto reducerFileGroupInfo = lifecycleManagerRef_->askSync(
+ protocol::GetReducerFileGroup{shuffleId},
+ conf_->clientRpcGetReducerFileGroupRpcAskTimeout());
+
+ switch (reducerFileGroupInfo->status) {
+ case protocol::SUCCESS: {
+ VLOG(1) << "success to get reducerFileGroupInfo, shuffleId " <<
shuffleId;
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (reducerFileGroupInfos_.count(shuffleId) > 0) {
+ VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId
+ << " already exists, ignored";
+ return;
+ }
+ reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo);
+ return;
+ }
+ case protocol::SHUFFLE_NOT_REGISTERED: {
+ // We cannot treat this as a failure. It indicates this is an empty
+ // shuffle.
+ LOG(WARNING) << "shuffleId " << shuffleId
+ << " is not registered when get reducerFileGroupInfo";
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (reducerFileGroupInfos_.count(shuffleId) > 0) {
+ VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId
+ << " already exists, ignored";
+ return;
+ }
+ reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo);
+ return;
+ }
+ case protocol::STAGE_END_TIME_OUT:
+ case protocol::SHUFFLE_DATA_LOST: {
+ LOG(ERROR) << "shuffleId " << shuffleId
+ << " failed getReducerFileGroupInfo with code "
+ << reducerFileGroupInfo->status;
+ CELEBORN_FAIL("failed protocol::GetReducerFileGroupResponse code");
+ }
+ default: {
+ CELEBORN_FAIL("undefined protocol::GetReducerFileGroupResponse code");
+ }
+ }
+}
+
+bool ShuffleClientImpl::cleanupShuffle(int shuffleId) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ reducerFileGroupInfos_.erase(shuffleId);
+ return true;
+}
+
+protocol::GetReducerFileGroupResponse&
+ShuffleClientImpl::getReducerFileGroupInfo(int shuffleId) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto iter = reducerFileGroupInfos_.find(shuffleId);
+ if (iter != reducerFileGroupInfos_.end()) {
+ return *iter->second;
+ }
+ }
+
+ updateReducerFileGroup(shuffleId);
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return *reducerFileGroupInfos_[shuffleId];
+ }
+}
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/ShuffleClient.h
b/cpp/celeborn/client/ShuffleClient.h
new file mode 100644
index 000000000..d66d1649d
--- /dev/null
+++ b/cpp/celeborn/client/ShuffleClient.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "celeborn/client/reader/CelebornInputStream.h"
+#include "celeborn/network/NettyRpcEndpointRef.h"
+
+namespace celeborn {
+namespace client {
+class ShuffleClient {
+ public:
+ virtual void setupLifecycleManagerRef(std::string& host, int port) = 0;
+
+ virtual void setupLifecycleManagerRef(
+ std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef) = 0;
+
+ virtual void updateReducerFileGroup(int shuffleId) = 0;
+
+ virtual std::unique_ptr<CelebornInputStream> readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex) = 0;
+
+ virtual bool cleanupShuffle(int shuffleId) = 0;
+
+ virtual void shutdown() = 0;
+};
+
+class ShuffleClientImpl : public ShuffleClient {
+ public:
+ ShuffleClientImpl(
+ const std::string& appUniqueId,
+ const std::shared_ptr<const conf::CelebornConf>& conf,
+ const std::shared_ptr<network::TransportClientFactory>& clientFactory);
+
+ void setupLifecycleManagerRef(std::string& host, int port) override;
+
+ void setupLifecycleManagerRef(
+ std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef)
+ override;
+
+ std::unique_ptr<CelebornInputStream> readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex) override;
+
+ void updateReducerFileGroup(int shuffleId) override;
+
+ bool cleanupShuffle(int shuffleId) override;
+
+ void shutdown() override {}
+
+ private:
+ protocol::GetReducerFileGroupResponse& getReducerFileGroupInfo(int
shuffleId);
+
+ const std::string appUniqueId_;
+ std::shared_ptr<const conf::CelebornConf> conf_;
+ std::shared_ptr<network::NettyRpcEndpointRef> lifecycleManagerRef_;
+ std::shared_ptr<network::TransportClientFactory> clientFactory_;
+ std::mutex mutex_;
+ std::unordered_map<
+ long,
+ std::unique_ptr<protocol::GetReducerFileGroupResponse>>
+ reducerFileGroupInfos_;
+};
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/utils/CelebornUtils.cpp
b/cpp/celeborn/utils/CelebornUtils.cpp
index 862dad7cc..24340cb79 100644
--- a/cpp/celeborn/utils/CelebornUtils.cpp
+++ b/cpp/celeborn/utils/CelebornUtils.cpp
@@ -19,6 +19,10 @@
namespace celeborn {
namespace utils {
+std::string makeShuffleKey(const std::string& appId, const int shuffleId) {
+ return appId + "-" + std::to_string(shuffleId);
+}
+
void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg) {
buffer.write<short>(msg.size());
buffer.writeFromString(msg);
diff --git a/cpp/celeborn/utils/CelebornUtils.h
b/cpp/celeborn/utils/CelebornUtils.h
index 83e899533..1fab9352d 100644
--- a/cpp/celeborn/utils/CelebornUtils.h
+++ b/cpp/celeborn/utils/CelebornUtils.h
@@ -35,6 +35,18 @@ namespace utils {
#define CELEBORN_SHUTDOWN_LOG(severity) \
LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX
+template <typename T>
+std::vector<T> toVector(const std::set<T>& in) {
+ std::vector<T> out{};
+ out.reserve(in.size());
+ for (const auto& i : in) {
+ out.emplace_back(i);
+ }
+ return std::move(out);
+}
+
+std::string makeShuffleKey(const std::string& appId, int shuffleId);
+
void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg);
void writeRpcAddress(