gauravkm commented on code in PR #3261:
URL: https://github.com/apache/celeborn/pull/3261#discussion_r2125628098
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -647,6 +650,35 @@ public Tuple2<Integer, Boolean> getShuffleId(
});
}
+ @Override
+ public void reducerPartitionEnd(
+ int shuffleId,
+ int partitionId,
+ int startMapIndex,
+ int endMapIndex,
+ int crc32,
+ long bytesWritten)
+ throws IOException {
+ PbReducerPartitionEnd pbReducerPartitionEnd =
+ PbReducerPartitionEnd.newBuilder()
+ .setShuffleId(shuffleId)
+ .setPartitionId(partitionId)
+ .setStartMaxIndex(startMapIndex)
+ .setEndMapIndex(endMapIndex)
+ .setCrc32(crc32)
+ .setBytesWritten(bytesWritten)
+ .build();
+
+ PbReducerPartitionEndResponse pbReducerPartitionEndResponse =
+ lifecycleManagerRef.askSync(
+ pbReducerPartitionEnd,
+ conf.clientRpcRegisterShuffleAskTimeout(),
+ ClassTag$.MODULE$.apply(PbReducerPartitionEndResponse.class));
+ if (pbReducerPartitionEndResponse.getStatus() !=
StatusCode.SUCCESS.getValue()) {
Review Comment:
Actually, the comment in code says
```
/*
* [[ExceptionMaker.makeException]], for spark applications with
celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating
* a FetchFailedException; and that will make the TaskContext as
failed with shuffle fetch issues - see SPARK-19276 for more.
* Given this, Celeborn can wrap the FetchFailedException with
our CelebornIOException
*/
```
Should i just plug into this or explicitly rely on `stage.rerun.enabled`
config? I don't see many uses of it apart from LifecycleManager
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]