FMX commented on code in PR #3059:
URL: https://github.com/apache/celeborn/pull/3059#discussion_r1919492557


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -730,6 +742,58 @@ private[deploy] class Controller(
     }
   }
 
+  def checkCommitTimeout(shuffleCommitTime: ConcurrentHashMap[
+    String,
+    ConcurrentHashMap[Long, (Long, RpcCallContext)]]): Unit = {
+
+    val currentTime = System.currentTimeMillis()
+    val commitTimeIterator = shuffleCommitTime.entrySet().iterator()
+    while (commitTimeIterator.hasNext) {
+      val timeMapEntry = commitTimeIterator.next()
+      val shuffleKey = timeMapEntry.getKey
+      val epochWaitTimeMap = timeMapEntry.getValue
+      val epochIterator = epochWaitTimeMap.entrySet().iterator()
+
+      while (epochIterator.hasNext && 
shuffleCommitInfos.containsKey(shuffleKey)) {
+        val epochWaitTimeEntry = epochIterator.next()
+        val epoch = epochWaitTimeEntry.getKey
+        val (commitStartWaitTime, context) = epochWaitTimeEntry.getValue
+        try {
+          val commitInfo = shuffleCommitInfos.get(shuffleKey).get(epoch)
+          commitInfo.synchronized {
+            if (commitInfo.status == CommitInfo.COMMIT_FINISHED) {
+              context.reply(commitInfo.response)
+              epochIterator.remove()
+            } else {
+              if (currentTime - commitStartWaitTime >= shuffleCommitTimeout) {
+                val replyResponse = CommitFilesResponse(
+                  StatusCode.COMMIT_FILE_EXCEPTION,
+                  List.empty.asJava,
+                  List.empty.asJava,
+                  commitInfo.response.failedPrimaryIds,
+                  commitInfo.response.failedReplicaIds)
+                shuffleCommitInfos.get(shuffleKey).put(
+                  epoch,
+                  new CommitInfo(replyResponse, CommitInfo.COMMIT_FINISHED))

Review Comment:
   You should not create a commit info here because you have synchronized 
operations on the commitInfo object. I think you can change the value of this 
commit info object.
   However, this won't be a trouble here because the epoch will not be 
replicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to