Hi, I am not yet very familiar with Celeborn, so will restrict my notes on the proposal in context to Apache Spark:
a) For Option 1, there is SPARK-25299 - which was started a few years back. Unfortunately, the work there has stalled: but if there is interest in pushing that forward, I can help shepard the contributions ! Full disclosure, the earlier proposal might be fairly outdated, and will involve a bit of investment to restart that work. b) On the ability to reuse a previous mapper output/minimize cost - that depends on a stage's DeterministicLevel. DETERMINATE mapper stage output can be reused, and not others - and there is a lot of nuance around how DAGScheduler handles it. Lot of it has to do with data correctness (See SPARK-23243 and the PR's linked there for more indepth analysis of this) - and this has kept evolving in the years since. DAGScheduler directly updates MapOutputTracker for a few cases - which includes for this. c) As a follow up to (b) above, even though MapOutputTracker is part of SparkEnv, and so 'accessible', I would be careful modifying its state directly outside of DAGScheduler. d) The computation for "celeborn shuffleId" would not work - since spark.stage.maxConsecutiveAttempts is for consecutive failures for a single stage in a job. The same shuffle id can be computed by different stages across jobs (for example: very common with Spark SQL AQE btw). A simple example here [1] Other than Option 1, the rest look like a tradeoff to varying degrees. I am not familiar enough with Celeborn to give good suggestions yet though. All the best in trying to solve this issue - looking forward to updates ! Regards, Mridul [1] Run with './bin/spark-shell --master 'local-cluster[4, 3, 1024]'' or yarn/k8s import org.apache.spark.TaskContext val rdd1 = sc.parallelize(0 until 10000, 20).map(v => (v, v)).groupByKey() val rdd2 = rdd1.mapPartitions { iter => val tc = TaskContext.get() if (0 == tc.partitionId() && tc.stageAttemptNumber() < 1) { System.exit(0) } iter } rdd2.count() rdd2.map(v => (v._1, v._2)).groupByKey().count() For both the jobs, the same shuffle id is used for the first shuffle. On Fri, Sep 22, 2023 at 10:48 AM Erik fang <fme...@gmail.com> wrote: > Hi folks, > > I have a proposal to implement Spark stage resubmission to handle shuffle > fetch failure in Celeborn > > > https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8 > > please have a look and let me know what you think > > Regards, > Erik >