turboFei commented on code in PR #3263:
URL: https://github.com/apache/celeborn/pull/3263#discussion_r2093775335
##########
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala:
##########
@@ -111,16 +111,35 @@ class CelebornShuffleReader[K, C](
val localHostAddress = Utils.localHostName(conf)
val shuffleKey = Utils.makeShuffleKey(handle.appUniqueId, shuffleId)
var fileGroups: ReduceFileGroups = null
- try {
- // startPartition is irrelevant
- fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
- } catch {
- case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
- // if a task is interrupted, should not report fetch failure
- // if a task update file group timeout, should not report fetch failure
- checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
- case e: Throwable => throw e
- }
+ var isShuffleStageEnd: Boolean = false
+ var updateFileGroupsRetryTimes = 0
+ do {
+ isShuffleStageEnd =
+ try {
+ shuffleClient.isShuffleStageEnd(shuffleId)
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to check shuffle stage end for $shuffleId,
assume not ended", e)
+ false
+ }
+ try {
+ // startPartition is irrelevant
+ fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
+ } catch {
+ case ce: CelebornIOException
+ if ce.getCause.isInstanceOf[TimeoutException] &&
!isShuffleStageEnd =>
+ updateFileGroupsRetryTimes += 1
+ logError(
+ s"UpdateFileGroup for $shuffleKey timeout due to shuffle stage not
ended," +
+ s" retry again, retry times $updateFileGroupsRetryTimes",
+ ce)
+ case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException)
=>
+ // if a task is interrupted, should not report fetch failure
+ // if a task update file group timeout, should not report fetch
failure
+ checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
+ case e: Throwable => throw e
Review Comment:
seems can remove it, it just throws the exception again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]