Copilot commented on code in PR #3583:
URL: https://github.com/apache/celeborn/pull/3583#discussion_r2810957648


##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
     const protocol::PartitionLocation& location) {
-  // TODO: support retrying when createReader failed. Maybe switch to peer
-  // location?
-  return createReader(location);
+  const protocol::PartitionLocation* currentLocation = &location;
+  std::exception_ptr lastException;
+
+  while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+    try {
+      VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+              << currentLocation->fetchPort;
+      if (isExcluded(*currentLocation)) {
+        throw std::runtime_error(
+            "Fetch data from excluded worker! " +
+            currentLocation->hostAndFetchPort());
+      }
+      auto reader = createReader(*currentLocation);
+      return reader;
+    } catch (const std::exception& e) {
+      lastException = std::current_exception();
+      excludeFailedFetchLocation(currentLocation->hostAndFetchPort(), e);
+      fetchChunkRetryCnt_++;
+
+      if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+        if (fetchChunkRetryCnt_ % 2 == 0) {
+          std::this_thread::sleep_for(
+              std::chrono::milliseconds(retryWait_.count()));
+        }

Review Comment:
   `createReaderWithRetry` switches to `currentLocation->getPeer()`, but in 
this codebase only PRIMARY locations own `replicaPeer` (REPLICA locations don't 
have a peer). After the first switch to the replica, 
`currentLocation->hasPeer()` will be false and the retry loop will never switch 
back to the primary, so the intended primary<->replica alternation (and the 
`retryCnt % 2` sleep semantics) won’t work. Consider keeping explicit pointers 
to both primary and replica (e.g., `primary=&location` and 
`replica=location.getPeer()`) and toggling between them on each failure instead 
of relying on `hasPeer()` on the current location.



##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -286,7 +290,19 @@ std::unique_ptr<CelebornInputStream> 
ShuffleClientImpl::readPartition(
       attemptNumber,
       startMapIndex,
       endMapIndex,
-      needCompression);
+      needCompression,
+      fetchExcludedWorkers_);
+}
+
+void ShuffleClientImpl::excludeFailedFetchLocation(
+    const std::string& hostAndFetchPort,
+    const std::exception& e) {
+  if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_) {

Review Comment:
   `ShuffleClientImpl::excludeFailedFetchLocation` ignores the passed exception 
(`e`) and will exclude the worker for any failure when the flags are enabled. 
This differs from the Java behavior (and from 
`CelebornInputStream::excludeFailedFetchLocation`) which only excludes on 
critical fetch causes. Use `utils::isCriticalCauseForFetch(e)` (or otherwise 
align the filtering logic) to avoid excluding workers on 
non-critical/read-corruption errors, and to avoid an unused-parameter warning 
for `e`.
   ```suggestion
     if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_ &&
         utils::isCriticalCauseForFetch(e)) {
   ```



##########
cpp/celeborn/client/reader/CelebornInputStream.h:
##########
@@ -17,14 +17,19 @@
 
 #pragma once
 
+#include <thread>
+

Review Comment:
   `<thread>` is included in this header but the header doesn’t use any 
threading types; it’s only needed in the .cpp for 
`std::this_thread::sleep_for`. Moving this include to `CelebornInputStream.cpp` 
would reduce transitive compile-time overhead for all includers of this header.
   ```suggestion
   
   ```



##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
     const protocol::PartitionLocation& location) {
-  // TODO: support retrying when createReader failed. Maybe switch to peer
-  // location?
-  return createReader(location);
+  const protocol::PartitionLocation* currentLocation = &location;
+  std::exception_ptr lastException;
+

Review Comment:
   The new retry / peer-switching / exclusion behavior in 
`createReaderWithRetry` is substantial but there are no unit tests covering it 
in the C++ client test suite (there are tests for other reader components like 
`WorkerPartitionReader`). Adding targeted tests (e.g., a fake 
`PartitionReader`/factory that fails N times, verifies alternation between 
primary/replica, and verifies exclusion timestamps) would help prevent 
regressions and ensure parity with the Java implementation.



##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {
 
 std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
     const protocol::PartitionLocation& location) {
-  // TODO: support retrying when createReader failed. Maybe switch to peer
-  // location?
-  return createReader(location);
+  const protocol::PartitionLocation* currentLocation = &location;
+  std::exception_ptr lastException;
+
+  while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+    try {
+      VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+              << currentLocation->fetchPort;
+      if (isExcluded(*currentLocation)) {
+        throw std::runtime_error(
+            "Fetch data from excluded worker! " +
+            currentLocation->hostAndFetchPort());
+      }
+      auto reader = createReader(*currentLocation);
+      return reader;
+    } catch (const std::exception& e) {
+      lastException = std::current_exception();
+      excludeFailedFetchLocation(currentLocation->hostAndFetchPort(), e);
+      fetchChunkRetryCnt_++;
+
+      if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+        if (fetchChunkRetryCnt_ % 2 == 0) {
+          std::this_thread::sleep_for(
+              std::chrono::milliseconds(retryWait_.count()));
+        }

Review Comment:
   This retry sleep can be simplified and made less error-prone by sleeping 
directly on `retryWait_` (it’s already a `std::chrono::milliseconds`) instead 
of re-wrapping `retryWait_.count()` into another `milliseconds` duration.



##########
cpp/celeborn/utils/CelebornUtils.cpp:
##########
@@ -19,6 +19,22 @@
 
 namespace celeborn {
 namespace utils {
+
+bool isCriticalCauseForFetch(const std::exception& e) {
+  if (dynamic_cast<const std::system_error*>(&e)) {
+    return true;
+  }
+
+  std::string msg = e.what();
+  if (msg.find("Connecting to") != std::string::npos ||
+      msg.find("Failed to") != std::string::npos ||

Review Comment:
   `isCriticalCauseForFetch` claims to match Java/Scala behavior, but the 
string checks use `find` (substring match) rather than the Java `startsWith` 
semantics for "Connecting to" / "Failed to". Using a prefix check here would 
better match upstream behavior and avoid classifying unrelated messages that 
merely contain these phrases as "critical" (which can lead to unnecessary 
worker exclusion).
   ```suggestion
     if (msg.find("Connecting to") == 0 ||
         msg.find("Failed to") == 0 ||
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to