Thanks Mridul Your comments provide lots of information and are very helpful, highly appreciated!
For a, I agree that restart SPARK-25299 requires big effort, and I don't think we can get it merged to Spark in short time For b & c, I've checked DeterministicLevel/Barrier RDD related codes in DAGScheduler previously, and will take a close look at SPARK-23243 Regarding MapOutputTracker, I had the concern of thread safety as well, however, I found shuffleStatuses is a ConcurrentHashMap and ShuffleStatus's public methods are thread safe <https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L53> I suppose it should be safe to modify outside of DAGScheduler? For d, I think I get your point let me have some test and get back to you soon Regards, Erik On Sat, Sep 23, 2023 at 6:10 PM Mridul Muralidharan <[email protected]> wrote: > Hi, > > I am not yet very familiar with Celeborn, so will restrict my notes on > the proposal in context to Apache Spark: > > a) For Option 1, there is SPARK-25299 - which was started a few years back. > Unfortunately, the work there has stalled: but if there is interest in > pushing that forward, I can help shepard the contributions ! > Full disclosure, the earlier proposal might be fairly outdated, and will > involve a bit of investment to restart that work. > > b) On the ability to reuse a previous mapper output/minimize cost - that > depends on a stage's DeterministicLevel. > DETERMINATE mapper stage output can be reused, and not others - and there > is a lot of nuance around how DAGScheduler handles it. > Lot of it has to do with data correctness (See SPARK-23243 and the PR's > linked there for more indepth analysis of this) - and this has kept > evolving in the years since. > DAGScheduler directly updates MapOutputTracker for a few cases - which > includes for this. > > c) As a follow up to (b) above, even though MapOutputTracker is part of > SparkEnv, and so 'accessible', I would be careful modifying its state > directly outside of DAGScheduler. > > d) The computation for "celeborn shuffleId" would not work - since > spark.stage.maxConsecutiveAttempts is for consecutive failures for a single > stage in a job. > The same shuffle id can be computed by different stages across jobs (for > example: very common with Spark SQL AQE btw). > A simple example here [1] > > > Other than Option 1, the rest look like a tradeoff to varying degrees. > I am not familiar enough with Celeborn to give good suggestions yet though. > > > All the best in trying to solve this issue - looking forward to updates ! > > Regards, > Mridul > > [1] > Run with './bin/spark-shell --master 'local-cluster[4, 3, 1024]'' or > yarn/k8s > > import org.apache.spark.TaskContext > > val rdd1 = sc.parallelize(0 until 10000, 20).map(v => (v, v)).groupByKey() > val rdd2 = rdd1.mapPartitions { iter => > val tc = TaskContext.get() > if (0 == tc.partitionId() && tc.stageAttemptNumber() < 1) { > System.exit(0) > } > iter > } > > rdd2.count() > rdd2.map(v => (v._1, v._2)).groupByKey().count() > > For both the jobs, the same shuffle id is used for the first shuffle. > > > > On Fri, Sep 22, 2023 at 10:48 AM Erik fang <[email protected]> wrote: > > > Hi folks, > > > > I have a proposal to implement Spark stage resubmission to handle shuffle > > fetch failure in Celeborn > > > > > > > https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8 > > > > please have a look and let me know what you think > > > > Regards, > > Erik > > >
