HolyLow commented on code in PR #3543: URL: https://github.com/apache/celeborn/pull/3543#discussion_r2562900993
########## cpp/celeborn/client/writer/PushDataCallback.cpp: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "celeborn/client/writer/PushDataCallback.h" +#include "celeborn/conf/CelebornConf.h" + +namespace celeborn { +namespace client { + +std::shared_ptr<PushDataCallback> PushDataCallback::create( + int shuffleId, + int mapId, + int attemptId, + int partitionId, + int numMappers, + int numPartitions, + const std::string& mapKey, + int batchId, + std::unique_ptr<memory::ReadOnlyByteBuffer> databody, + std::shared_ptr<PushState> pushState, + std::weak_ptr<ShuffleClientImpl> weakClient, + int remainingReviveTimes, + std::shared_ptr<const protocol::PartitionLocation> latestLocation) { + return std::shared_ptr<PushDataCallback>(new PushDataCallback( + shuffleId, + mapId, + attemptId, + partitionId, + numMappers, + numPartitions, + mapKey, + batchId, + std::move(databody), + pushState, + weakClient, + remainingReviveTimes, + latestLocation)); +} + +PushDataCallback::PushDataCallback( + int shuffleId, + int mapId, + int attemptId, + int partitionId, + int numMappers, + int numPartitions, + const std::string& mapKey, + int batchId, + std::unique_ptr<memory::ReadOnlyByteBuffer> databody, + std::shared_ptr<PushState> pushState, + std::weak_ptr<ShuffleClientImpl> weakClient, + int remainingReviveTimes, + std::shared_ptr<const protocol::PartitionLocation> latestLocation) + : shuffleId_(shuffleId), + mapId_(mapId), + attemptId_(attemptId), + partitionId_(partitionId), + numMappers_(numMappers), + numPartitions_(numPartitions), + mapKey_(mapKey), + batchId_(batchId), + databody_(std::move(databody)), + pushState_(pushState), + weakClient_(weakClient), + remainingReviveTimes_(remainingReviveTimes), + latestLocation_(latestLocation) {} + +void PushDataCallback::onSuccess( + std::unique_ptr<memory::ReadOnlyByteBuffer> response) { + auto sharedClient = weakClient_.lock(); + if (!sharedClient) { + LOG(WARNING) << "ShuffleClientImpl has expired when " Review Comment: The expired shuffleClient means that the shuffleClient object itself is destructed and should no longer be accessed. In cpp there is no GC mechanism like JVM, so we must carefully avoid dangling pointer problem (which causes program to crash) and cyclic referencing problem (which causes memory leakage). The solution is by carefully using shared_ptr and weak_ptr. Typically this case comes from releasing resources in abnormal cases, like the shuffle failed for some reason, or the shuffle is cancelled. The expiration checking is for protecting the function from the pointer dangling problem, and it is not meaningful to mimic the dangling case in UT, though it is easy to do so. What do you think? I am open to further changes if you like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
