CodingCat commented on code in PR #3569:
URL: https://github.com/apache/celeborn/pull/3569#discussion_r2646593547
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1269,6 +1271,39 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
var ret = true
shuffleIds.synchronized {
+ val stageIdentifier = s"$stageId.$stageAttemptId"
+ val shuffleIdentifier = s"$appShuffleId.$stageId.$stageAttemptId"
+ if (stagesReceivedInvalidatingUpstream.getOrElse(stageIdentifier, new
mutable.HashSet[Int]())
+ .contains(appShuffleId)) {
+ println(s"${Thread.currentThread().getName} " +
+ s"ignoring missing shuffle id report from stage
$stageId.$stageAttemptId as" +
+ s" it is already reported by other reader and handled")
+ } else {
+ println(s"${Thread.currentThread().getName} handle missing shuffle id
for appShuffleId" +
+ s" $appShuffleId stage" +
+ s" $stageId.$stageAttemptId")
+ appShuffleTrackerCallback match {
+ case Some(callback) =>
+ try {
+ callback.accept(appShuffleId)
+ } catch {
+ case t: Throwable =>
+ logError(t.toString)
+ ret = false
+ }
+ shuffleIds.put(shuffleIdentifier,
(UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID, false))
+ case None =>
+ throw new UnsupportedOperationException(
+ "unexpected! appShuffleTrackerCallback is not registered")
+ }
+ invalidateShuffleWrittenByStage(stageId)
+ stagesReceivedInvalidatingUpstream += stageIdentifier ->
+ (stagesReceivedInvalidatingUpstream.getOrElse(
+ stageIdentifier, new mutable.HashSet[Int]()) ++ Set(appShuffleId))
+ val pbReportMissingShuffleIdResponse =
+ PbReportMissingShuffleIdResponse.newBuilder().setSuccess(ret).build()
+ context.reply(pbReportMissingShuffleIdResponse)
+ /*
val latestUpstreamShuffleId = shuffleIds.maxBy(_._2._1)
Review Comment:
the original dedup logic may suffer from a race condition described as
following
stage A depends on shuffle 1, due to "too early deletion", the missing
report is sent and handled for shuffle 1, at this point, a new shuffle id is
generated, so latestUpstreamShuffleId._2._1 is no longer
UNKNOWN_MISSING_CELEBORN_SHUFFLE_ID... the missing report is handled again...
then mess up everything
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]