Copilot commented on code in PR #3583:
URL: https://github.com/apache/celeborn/pull/3583#discussion_r2810957648
##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {
std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
- // TODO: support retrying when createReader failed. Maybe switch to peer
- // location?
- return createReader(location);
+ const protocol::PartitionLocation* currentLocation = &location;
+ std::exception_ptr lastException;
+
+ while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+ try {
+ VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+ << currentLocation->fetchPort;
+ if (isExcluded(*currentLocation)) {
+ throw std::runtime_error(
+ "Fetch data from excluded worker! " +
+ currentLocation->hostAndFetchPort());
+ }
+ auto reader = createReader(*currentLocation);
+ return reader;
+ } catch (const std::exception& e) {
+ lastException = std::current_exception();
+ excludeFailedFetchLocation(currentLocation->hostAndFetchPort(), e);
+ fetchChunkRetryCnt_++;
+
+ if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+ if (fetchChunkRetryCnt_ % 2 == 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(retryWait_.count()));
+ }
Review Comment:
`createReaderWithRetry` switches to `currentLocation->getPeer()`, but in
this codebase only PRIMARY locations own `replicaPeer` (REPLICA locations don't
have a peer). After the first switch to the replica,
`currentLocation->hasPeer()` will be false and the retry loop will never switch
back to the primary, so the intended primary<->replica alternation (and the
`retryCnt % 2` sleep semantics) won’t work. Consider keeping explicit pointers
to both primary and replica (e.g., `primary=&location` and
`replica=location.getPeer()`) and toggling between them on each failure instead
of relying on `hasPeer()` on the current location.
##########
cpp/celeborn/client/ShuffleClient.cpp:
##########
@@ -286,7 +290,19 @@ std::unique_ptr<CelebornInputStream>
ShuffleClientImpl::readPartition(
attemptNumber,
startMapIndex,
endMapIndex,
- needCompression);
+ needCompression,
+ fetchExcludedWorkers_);
+}
+
+void ShuffleClientImpl::excludeFailedFetchLocation(
+ const std::string& hostAndFetchPort,
+ const std::exception& e) {
+ if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_) {
Review Comment:
`ShuffleClientImpl::excludeFailedFetchLocation` ignores the passed exception
(`e`) and will exclude the worker for any failure when the flags are enabled.
This differs from the Java behavior (and from
`CelebornInputStream::excludeFailedFetchLocation`) which only excludes on
critical fetch causes. Use `utils::isCriticalCauseForFetch(e)` (or otherwise
align the filtering logic) to avoid excluding workers on
non-critical/read-corruption errors, and to avoid an unused-parameter warning
for `e`.
```suggestion
if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_ &&
utils::isCriticalCauseForFetch(e)) {
```
##########
cpp/celeborn/client/reader/CelebornInputStream.h:
##########
@@ -17,14 +17,19 @@
#pragma once
+#include <thread>
+
Review Comment:
`<thread>` is included in this header but the header doesn’t use any
threading types; it’s only needed in the .cpp for
`std::this_thread::sleep_for`. Moving this include to `CelebornInputStream.cpp`
would reduce transitive compile-time overhead for all includers of this header.
```suggestion
```
##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {
std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
- // TODO: support retrying when createReader failed. Maybe switch to peer
- // location?
- return createReader(location);
+ const protocol::PartitionLocation* currentLocation = &location;
+ std::exception_ptr lastException;
+
Review Comment:
The new retry / peer-switching / exclusion behavior in
`createReaderWithRetry` is substantial but there are no unit tests covering it
in the C++ client test suite (there are tests for other reader components like
`WorkerPartitionReader`). Adding targeted tests (e.g., a fake
`PartitionReader`/factory that fails N times, verifies alternation between
primary/replica, and verifies exclusion timestamps) would help prevent
regressions and ensure parity with the Java implementation.
##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +205,58 @@ void CelebornInputStream::moveToNextReader() {
std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
- // TODO: support retrying when createReader failed. Maybe switch to peer
- // location?
- return createReader(location);
+ const protocol::PartitionLocation* currentLocation = &location;
+ std::exception_ptr lastException;
+
+ while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+ try {
+ VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+ << currentLocation->fetchPort;
+ if (isExcluded(*currentLocation)) {
+ throw std::runtime_error(
+ "Fetch data from excluded worker! " +
+ currentLocation->hostAndFetchPort());
+ }
+ auto reader = createReader(*currentLocation);
+ return reader;
+ } catch (const std::exception& e) {
+ lastException = std::current_exception();
+ excludeFailedFetchLocation(currentLocation->hostAndFetchPort(), e);
+ fetchChunkRetryCnt_++;
+
+ if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
+ if (fetchChunkRetryCnt_ % 2 == 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(retryWait_.count()));
+ }
Review Comment:
This retry sleep can be simplified and made less error-prone by sleeping
directly on `retryWait_` (it’s already a `std::chrono::milliseconds`) instead
of re-wrapping `retryWait_.count()` into another `milliseconds` duration.
##########
cpp/celeborn/utils/CelebornUtils.cpp:
##########
@@ -19,6 +19,22 @@
namespace celeborn {
namespace utils {
+
+bool isCriticalCauseForFetch(const std::exception& e) {
+ if (dynamic_cast<const std::system_error*>(&e)) {
+ return true;
+ }
+
+ std::string msg = e.what();
+ if (msg.find("Connecting to") != std::string::npos ||
+ msg.find("Failed to") != std::string::npos ||
Review Comment:
`isCriticalCauseForFetch` claims to match Java/Scala behavior, but the
string checks use `find` (substring match) rather than the Java `startsWith`
semantics for "Connecting to" / "Failed to". Using a prefix check here would
better match upstream behavior and avoid classifying unrelated messages that
merely contain these phrases as "critical" (which can lead to unnecessary
worker exclusion).
```suggestion
if (msg.find("Connecting to") == 0 ||
msg.find("Failed to") == 0 ||
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]