mridulm commented on code in PR #3263:
URL: https://github.com/apache/celeborn/pull/3263#discussion_r2093402716


##########
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala:
##########
@@ -111,16 +111,35 @@ class CelebornShuffleReader[K, C](
     val localHostAddress = Utils.localHostName(conf)
     val shuffleKey = Utils.makeShuffleKey(handle.appUniqueId, shuffleId)
     var fileGroups: ReduceFileGroups = null
-    try {
-      // startPartition is irrelevant
-      fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
-    } catch {
-      case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
-        // if a task is interrupted, should not report fetch failure
-        // if a task update file group timeout, should not report fetch failure
-        checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
-      case e: Throwable => throw e
-    }
+    var isShuffleStageEnd: Boolean = false
+    var updateFileGroupsRetryTimes = 0
+    do {
+      isShuffleStageEnd =
+        try {
+          shuffleClient.isShuffleStageEnd(shuffleId)
+        } catch {
+          case e: IOException =>
+            logError(s"Failed to check shuffle stage end for $shuffleId, 
assume not ended", e)
+            false
+        }
+      try {
+        // startPartition is irrelevant
+        fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)

Review Comment:
   If `LifecycleManager` successfully indicates stage has not ended (that is, 
the rpc call returns false - not an exception), then do we need to wait update 
here ? Or wait ?



##########
client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala:
##########
@@ -111,16 +111,35 @@ class CelebornShuffleReader[K, C](
     val localHostAddress = Utils.localHostName(conf)
     val shuffleKey = Utils.makeShuffleKey(handle.appUniqueId, shuffleId)
     var fileGroups: ReduceFileGroups = null
-    try {
-      // startPartition is irrelevant
-      fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
-    } catch {
-      case ce @ (_: CelebornIOException | _: PartitionUnRetryAbleException) =>
-        // if a task is interrupted, should not report fetch failure
-        // if a task update file group timeout, should not report fetch failure
-        checkAndReportFetchFailureForUpdateFileGroupFailure(shuffleId, ce)
-      case e: Throwable => throw e
-    }
+    var isShuffleStageEnd: Boolean = false
+    var updateFileGroupsRetryTimes = 0
+    do {
+      isShuffleStageEnd =
+        try {
+          shuffleClient.isShuffleStageEnd(shuffleId)
+        } catch {
+          case e: IOException =>
+            logError(s"Failed to check shuffle stage end for $shuffleId, 
assume not ended", e)

Review Comment:
   `INFO` instead ?



##########
client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala:
##########
@@ -184,6 +184,15 @@ abstract class CommitHandler(
       shuffleId: Int,
       serdeVersion: SerdeVersion): Unit
 
+  /**
+   * Only Reduce partition mode supports to get stage end.

Review Comment:
   nit:
   ```suggestion
      * Only Reduce partition mode supports get stage end.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to