afterincomparableyum commented on code in PR #3553:
URL: https://github.com/apache/celeborn/pull/3553#discussion_r2577290045
##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -143,6 +311,335 @@ bool ShuffleClientImpl::cleanupShuffle(int shuffleId) {
return true;
}
+std::shared_ptr<PushState> ShuffleClientImpl::getPushState(
+ const std::string& mapKey) {
+ return pushStates_.computeIfAbsent(
+ mapKey, [&]() { return std::make_shared<PushState>(*conf_); });
+}
+
+void ShuffleClientImpl::initReviveManagerLocked() {
+ if (!reviveManager_) {
+ std::string uniqueName = appUniqueId_;
+ uniqueName += std::to_string(utils::currentTimeNanos());
+ reviveManager_ =
+ ReviveManager::create(uniqueName, *conf_, weak_from_this());
+ }
+}
+
+void ShuffleClientImpl::registerShuffle(
+ int shuffleId,
+ int numMappers,
+ int numPartitions) {
+ auto shuffleMutex = shuffleMutexes_.computeIfAbsent(
+ shuffleId, []() { return std::make_shared<std::mutex>(); });
+ // RegisterShuffle might be issued concurrently, we only allow one issue
+ // for each shuffleId.
+ std::lock_guard<std::mutex> lock(*shuffleMutex);
+ if (partitionLocationMaps_.containsKey(shuffleId)) {
+ return;
+ }
+ CELEBORN_CHECK(
+ lifecycleManagerRef_, "lifecycleManagerRef_ is not initialized");
+ const int maxRetries = conf_->clientRegisterShuffleMaxRetries();
+ int numRetries = 1;
+ for (; numRetries <= maxRetries; numRetries++) {
+ try {
+ // Send the query request to lifecycleManager.
+ auto registerShuffleResponse = lifecycleManagerRef_->askSync<
+ protocol::RegisterShuffle,
+ protocol::RegisterShuffleResponse>(
+ protocol::RegisterShuffle{shuffleId, numMappers, numPartitions},
+ conf_->clientRpcRegisterShuffleRpcAskTimeout());
+
+ switch (registerShuffleResponse->status) {
+ case protocol::StatusCode::SUCCESS: {
+ VLOG(1) << "success to registerShuffle, shuffleId " << shuffleId
+ << " numMappers " << numMappers << " numPartitions "
+ << numPartitions;
+ auto partitionLocationMap =
std::make_shared<utils::ConcurrentHashMap<
+ int,
+ std::shared_ptr<const protocol::PartitionLocation>>>();
+ auto& partitionLocations =
+ registerShuffleResponse->partitionLocations;
+ for (int i = 0; i < partitionLocations.size(); i++) {
+ auto id = partitionLocations[i]->id;
+ partitionLocationMap->set(id, std::move(partitionLocations[i]));
+ }
+ partitionLocationMaps_.set(
+ shuffleId, std::move(partitionLocationMap));
+ return;
+ }
+ default: {
+ LOG(ERROR)
+ << "LifecycleManager request slots return protocol::StatusCode "
+ << registerShuffleResponse->status << " , shuffleId " <<
shuffleId
+ << " NumMappers " << numMappers << " numPartitions "
+ << numPartitions << " , retry again, remain retry times "
+ << maxRetries - numRetries;
+ }
+ }
+ } catch (std::exception& e) {
+ CELEBORN_FAIL(fmt::format(
+ "registerShuffle encounters error after {} tries, "
+ "shuffleId {} numMappers {} numPartitions {}, errorMsg: {}",
+ numRetries,
+ shuffleId,
+ numMappers,
+ numPartitions,
+ e.what()));
+ break;
+ }
+ std::this_thread::sleep_for(conf_->clientRegisterShuffleRetryWait());
+ }
+ partitionLocationMaps_.set(shuffleId, nullptr);
+ CELEBORN_FAIL(fmt::format(
+ "registerShuffle failed after {} tries, "
+ "shuffleId {} numMappers {} numPartitions {}",
+ maxRetries,
+ shuffleId,
+ numMappers,
+ numPartitions));
+}
+
+void ShuffleClientImpl::submitRetryPushData(
+ int shuffleId,
+ std::unique_ptr<memory::ReadOnlyByteBuffer> body,
+ int batchId,
+ std::shared_ptr<PushDataCallback> pushDataCallback,
+ std::shared_ptr<PushState> pushState,
+ PtrReviveRequest request,
+ int remainReviveTimes,
+ long dueTimeMs) {
+ long reviveWaitTimeMs = dueTimeMs - utils::currentTimeMillis();
+ long accumulatedTimeMs = 0;
+ const long deltaMs = 50;
+ while (request->reviveStatus.load() ==
+ protocol::StatusCode::REVIVE_INITIALIZED &&
+ accumulatedTimeMs <= reviveWaitTimeMs) {
+ std::this_thread::sleep_for(utils::MS(deltaMs));
+ accumulatedTimeMs += deltaMs;
+ }
+ if (mapperEnded(shuffleId, request->mapId)) {
+ VLOG(1) << "Revive for push data success, but the mapper already ended "
+ "for shuffle "
+ << shuffleId << " map " << request->mapId << " attempt "
+ << request->attemptId << " partition " << request->partitionId
+ << " batch " << batchId << " location hostAndPushPort "
+ << request->loc->hostAndPushPort() << ".";
Review Comment:
So from what I am reading in the code, it looks like `revive()` can be
called with nullptr for the oldLocation parameter (actually it looks like it is
explicitly called in line 106). This means that `request->loc` can be null when
`submitRetryPushData()` is called. So if we dereference this without checking
for nullptr, it's gonna crash by segmentation fault.
I would recommend changing the code to this:
```
if (mapperEnded(shuffleId, request->mapId)) {
VLOG(1) << "Revive for push data success, but the mapper already ended "
"for shuffle "
<< shuffleId << " map " << request->mapId << " attempt "
<< request->attemptId << " partition " << request->partitionId
<< " batch " << batchId;
if (request->loc) {
VLOG(1) << " location hostAndPushPort " <<
request->loc->hostAndPushPort() << ".";
pushState->removeBatch(batchId, request->loc->hostAndPushPort());
} else {
VLOG(1) << " (no location available).";
}
return;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]