CodingCat commented on code in PR #3569:
URL: https://github.com/apache/celeborn/pull/3569#discussion_r2646593547


##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1269,6 +1271,39 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
     var ret = true
     shuffleIds.synchronized {
+      val stageIdentifier = s"$stageId.$stageAttemptId"
+      val shuffleIdentifier = s"$appShuffleId.$stageId.$stageAttemptId"
+      if (stagesReceivedInvalidatingUpstream.getOrElse(stageIdentifier, new 
mutable.HashSet[Int]())
+        .contains(appShuffleId)) {
+        println(s"${Thread.currentThread().getName} " +
+          s"ignoring missing shuffle id report from stage 
$stageId.$stageAttemptId as" +
+          s" it is already reported  by other reader and handled")
+      } else {
+        println(s"${Thread.currentThread().getName} handle missing shuffle id 
for appShuffleId" +
+          s" $appShuffleId stage" +
+          s" $stageId.$stageAttemptId")
+        appShuffleTrackerCallback match {
+          case Some(callback) =>
+            try {
+              callback.accept(appShuffleId)
+            } catch {
+              case t: Throwable =>
+                logError(t.toString)
+                ret = false
+            }
+            shuffleIds.put(shuffleIdentifier, 
(UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID, false))
+          case None =>
+            throw new UnsupportedOperationException(
+              "unexpected! appShuffleTrackerCallback is not registered")
+        }
+        invalidateShuffleWrittenByStage(stageId)
+        stagesReceivedInvalidatingUpstream += stageIdentifier ->
+          (stagesReceivedInvalidatingUpstream.getOrElse(
+            stageIdentifier, new mutable.HashSet[Int]()) ++ Set(appShuffleId))
+        val pbReportMissingShuffleIdResponse =
+          PbReportMissingShuffleIdResponse.newBuilder().setSuccess(ret).build()
+        context.reply(pbReportMissingShuffleIdResponse)
+      /*
       val latestUpstreamShuffleId = shuffleIds.maxBy(_._2._1)

Review Comment:
   the original dedup logic may suffer from a race condition described as 
following
   
   stage A depends on shuffle 1, due to "too early deletion", the missing 
report is sent and handled for shuffle 1, at this point, a new shuffle id is 
generated, so latestUpstreamShuffleId._2._1 is no longer 
UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID... the missing report is handled again... 
then mess up everything



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to