Copilot commented on code in PR #3583:
URL: https://github.com/apache/celeborn/pull/3583#discussion_r2731656758
##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +196,45 @@ void CelebornInputStream::moveToNextReader() {
std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
- // TODO: support retrying when createReader failed. Maybe switch to peer
- // location?
- return createReader(location);
+ const protocol::PartitionLocation* currentLocation = &location;
+ std::exception_ptr lastException;
+
+ while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+ try {
+ VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+ << currentLocation->fetchPort;
+ auto reader = createReader(*currentLocation);
+ fetchChunkRetryCnt_ = 0;
+ return reader;
+ } catch (const std::exception& e) {
+ lastException = std::current_exception();
+ fetchChunkRetryCnt_++;
+
+ if (currentLocation->hasPeer()) {
+ if (fetchChunkRetryCnt_ % 2 == 0) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(retryWait_.count()));
+ }
+ LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+ << "/" << fetchChunkMaxRetry_ << " times for location "
+ << currentLocation->hostAndFetchPort()
+ << ", change to peer. Error: " << e.what();
+ currentLocation = currentLocation->getPeer();
+ } else {
+ LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
+ << "/" << fetchChunkMaxRetry_ << " times for location "
+ << currentLocation->hostAndFetchPort()
+ << ", retry the same location. Error: " << e.what();
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(retryWait_.count()));
+ }
+ }
+ }
+
+ CELEBORN_FAIL(
+ "createPartitionReader failed after " +
+ std::to_string(fetchChunkRetryCnt_) + " retries for location " +
+ location.hostAndFetchPort());
Review Comment:
The captured lastException should be rethrown instead of using
CELEBORN_FAIL. The Java implementation throws CelebornIOException with the
lastException as the cause. In C++, you should use
std::rethrow_exception(lastException) to preserve the original exception
information, which is critical for debugging. If you want to add context, you
could wrap it in a CelebornRuntimeError similar to the pattern seen in
CelebornException.cpp line 35-36.
##########
cpp/celeborn/client/reader/CelebornInputStream.cpp:
##########
@@ -189,9 +196,45 @@ void CelebornInputStream::moveToNextReader() {
std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
- // TODO: support retrying when createReader failed. Maybe switch to peer
- // location?
- return createReader(location);
+ const protocol::PartitionLocation* currentLocation = &location;
+ std::exception_ptr lastException;
+
+ while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
+ try {
+ VLOG(1) << "Create reader for location " << currentLocation->host << ":"
+ << currentLocation->fetchPort;
+ auto reader = createReader(*currentLocation);
+ fetchChunkRetryCnt_ = 0;
Review Comment:
Resetting fetchChunkRetryCnt_ to 0 on successful reader creation (line 207)
is redundant since it's already reset at line 187 in moveToNextReader() before
this function is called. While not harmful, removing this reset would make the
code cleaner and align better with the Java implementation which doesn't reset
the counter after successful reader creation.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]