Copilot commented on code in PR #3583:
URL: https://github.com/apache/celeborn/pull/3583#discussion_r2835744417


##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,56 @@ void CelebornInputStream::moveToNextReader() {
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
     const protocol::PartitionLocation& location) {
-  // TODO: support retrying when createReader failed. Maybe switch to peer
-  // location?
-  return createReader(location);
+  const protocol::PartitionLocation* currentLocation = &location;
+  std::exception_ptr lastException;
+
+  while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+    try {
+      VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+              << currentLocation->fetchPort;
+      if (isExcluded(*currentLocation)) {
+        CELEBORN_FAIL(
+            "Fetch data from excluded worker! {}",
+            currentLocation->hostAndFetchPort());
+      }
+      auto reader = createReader(*currentLocation);
+      return reader;
+    } catch (const std::exception& e) {
+      lastException = std::current_exception();
+      excludeFailedFetchLocation(currentLocation->hostAndFetchPort(), e);
+      fetchChunkRetryCnt_++;
+
+      if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+        if (fetchChunkRetryCnt_ % 2 == 0) {
+          std::this_thread::sleep_for(retryWait_);
+        }
+        LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+                     << "/" << fetchChunkMaxRetry_ << " times for location "
+                     << currentLocation->hostAndFetchPort()
+                     << ", change to peer. Error: " << e.what();
+        // TODO: When stream handlers are supported, send BUFFER_STREAM_END
+        // to close the active stream before switching to peer, matching the
+        // Java CelebornInputStream behavior. Currently, the C++ client does
+        // not have pre-opened stream handlers, so stream cleanup is handled
+        // by WorkerPartitionReader's destructor.
+        currentLocation = currentLocation->getPeer();
+      } else {
+        LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+                     << "/" << fetchChunkMaxRetry_ << " times for location "
+                     << currentLocation->hostAndFetchPort()
+                     << ", retry the same location. Error: " << e.what();
+        std::this_thread::sleep_for(retryWait_);
+      }
+    }
+  }
+
+  // Max retries exceeded, rethrow the last exception wrapped with context
+  throw utils::CelebornRuntimeError(
+      lastException,
+      "createPartitionReader failed after " +
+          std::to_string(fetchChunkRetryCnt_) + " retries for location " +
+          location.hostAndFetchPort(),
+      false);

Review Comment:
   The final wrapped exception message always reports the original 
`location.hostAndFetchPort()`, even if the last failure happened after 
switching to the peer. This can be misleading when debugging. Consider 
including the last attempted location (e.g., 
currentLocation->hostAndFetchPort()) or both original+current in the error 
context.
   ```suggestion
     std::string errorMessage =
         "createPartitionReader failed after " +
         std::to_string(fetchChunkRetryCnt_) +
         " retries. Original location: " + location.hostAndFetchPort() +
         ", last attempted location: " + currentLocation->hostAndFetchPort();
     throw utils::CelebornRuntimeError(lastException, errorMessage, false);
   ```



##########
cpp/celeborn/client/ShuffleClient.h:
##########
@@ -70,6 +72,10 @@ class ShuffleClient {
       int endMapIndex,
       bool needCompression) = 0;
 
+  virtual void excludeFailedFetchLocation(
+      const std::string& hostAndFetchPort,
+      const std::exception& e) = 0;
+

Review Comment:
   The new ShuffleClient::excludeFailedFetchLocation API (and its 
implementation in ShuffleClientImpl) appears unused in the current codebase 
(CelebornInputStream maintains exclusion internally instead). If this isn’t 
intended for external callers, consider removing it to avoid expanding the 
public interface; otherwise, consider wiring CelebornInputStream to call 
through ShuffleClient so there is a single source of truth for exclusion 
behavior.
   ```suggestion
   
   ```



##########
cpp/celeborn/conf/CelebornConf.h:
##########
@@ -129,6 +129,25 @@ class CelebornConf : public BaseConf {
   static constexpr std::string_view kShuffleCompressionZstdCompressLevel{
       "celeborn.client.shuffle.compression.zstd.level"};
 
+  static constexpr std::string_view kClientFetchMaxRetriesForEachReplica{
+      "celeborn.client.fetch.maxRetriesForEachReplica"};
+
+  static constexpr std::string_view kNetworkIoRetryWait{
+      "celeborn.data.io.retryWait"};
+
+  static constexpr std::string_view kClientPushReplicateEnabled{
+      "celeborn.client.push.replicate.enabled"};
+
+  static constexpr std::string_view kClientFetchExcludeWorkerOnFailureEnabled{
+      "celeborn.client.fetch.excludeWorkerOnFailure.enabled"};
+
+  static constexpr std::string_view kClientFetchExcludedWorkerExpireTimeout{
+      "celeborn.client.fetch.excludedWorker.expireTimeout"};
+
+  static constexpr std::string_view
+      kClientAdaptiveOptimizeSkewedPartitionReadEnabled{
+          "celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled"};

Review Comment:
   PR description lists 3 new config properties, but this change also 
introduces additional fetch-related configs (excludeWorkerOnFailure, 
excludedWorker expireTimeout, optimizeSkewedPartitionRead). Please update the 
PR description (or drop these configs if out of scope) so reviewers/users can 
understand the full config surface change.



##########
cpp/celeborn/client/ShuffleClient.h:
##########


Review Comment:
   ShuffleClient.h uses utils::ConcurrentHashMap in several type aliases/member 
declarations, but it does not include the header that defines it 
(celeborn/utils/CelebornUtils.h). It currently compiles only because 
CelebornInputStream.h happens to include CelebornUtils.h; add the direct 
include here to avoid fragile transitive dependencies.



##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########


Review Comment:
   CelebornInputStream.cpp uses std::this_thread::sleep_for(retryWait_) later 
in the file but does not include <thread> (and the header no longer provides 
it). This can fail to compile depending on transitive includes; add an explicit 
#include <thread> in this .cpp.



##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,56 @@ void CelebornInputStream::moveToNextReader() {
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
     const protocol::PartitionLocation& location) {
-  // TODO: support retrying when createReader failed. Maybe switch to peer
-  // location?
-  return createReader(location);
+  const protocol::PartitionLocation* currentLocation = &location;
+  std::exception_ptr lastException;
+
+  while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+    try {
+      VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+              << currentLocation->fetchPort;
+      if (isExcluded(*currentLocation)) {
+        CELEBORN_FAIL(
+            "Fetch data from excluded worker! {}",
+            currentLocation->hostAndFetchPort());
+      }

Review Comment:
   When a location is already in the exclusion list, the code throws and then 
retries/sleeps (especially in the no-peer branch). Since 
isExcluded(*currentLocation) will keep returning true until the exclusion 
expires, these retries are guaranteed to fail and just add delay. Consider 
failing fast (no sleep/retry) or skipping to another available location/peer 
when the current location is excluded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to