Thanks Mridul
Your comments provide lots of information and are very helpful, highly
appreciated!

For a, I agree that restart SPARK-25299 requires big effort, and I don't
think we can get it merged to Spark in short time

For b & c, I've checked DeterministicLevel/Barrier RDD related codes in
DAGScheduler previously, and will take a close look at SPARK-23243
Regarding MapOutputTracker, I had the concern of thread safety as well,
however, I found shuffleStatuses is a ConcurrentHashMap and ShuffleStatus's
public methods are thread safe
<https://github.com/apache/spark/blob/v3.2.0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L53>
I suppose it should be safe to modify outside of DAGScheduler?

For d, I think I get your point
let me have some test and get back to you soon

Regards,
Erik

On Sat, Sep 23, 2023 at 6:10 PM Mridul Muralidharan <[email protected]>
wrote:

> Hi,
>
>   I am not yet very familiar with Celeborn, so will restrict my notes on
> the proposal in context to Apache Spark:
>
> a) For Option 1, there is SPARK-25299 - which was started a few years back.
> Unfortunately, the work there has stalled: but if there is interest in
> pushing that forward, I can help shepard the contributions !
> Full disclosure, the earlier proposal might be fairly outdated, and will
> involve a bit of investment to restart that work.
>
> b) On the ability to reuse a previous mapper output/minimize cost - that
> depends on a stage's DeterministicLevel.
> DETERMINATE mapper stage output can be reused, and not others - and there
> is a lot of nuance around how DAGScheduler handles it.
> Lot of it has to do with data correctness (See SPARK-23243 and the PR's
> linked there for more indepth analysis of this) - and this has kept
> evolving in the years since.
> DAGScheduler directly updates MapOutputTracker for a few cases - which
> includes for this.
>
> c) As a follow up to (b) above, even though MapOutputTracker is part of
> SparkEnv, and so 'accessible', I would be careful modifying its state
> directly outside of DAGScheduler.
>
> d) The computation for "celeborn shuffleId" would not work - since
> spark.stage.maxConsecutiveAttempts is for consecutive failures for a single
> stage in a job.
> The same shuffle id can be computed by different stages across jobs (for
> example: very common with Spark SQL AQE btw).
> A simple example here [1]
>
>
> Other than Option 1, the rest look like a tradeoff to varying degrees.
> I am not familiar enough with Celeborn to give good suggestions yet though.
>
>
> All the best in trying to solve this issue - looking forward to updates !
>
> Regards,
> Mridul
>
> [1]
> Run with './bin/spark-shell  --master 'local-cluster[4, 3, 1024]'' or
> yarn/k8s
>
> import org.apache.spark.TaskContext
>
> val rdd1 = sc.parallelize(0 until 10000, 20).map(v => (v, v)).groupByKey()
> val rdd2 = rdd1.mapPartitions { iter =>
>   val tc = TaskContext.get()
>   if (0 == tc.partitionId() && tc.stageAttemptNumber() < 1) {
>     System.exit(0)
>   }
>   iter
> }
>
> rdd2.count()
> rdd2.map(v => (v._1, v._2)).groupByKey().count()
>
> For both the jobs, the same shuffle id is used for the first shuffle.
>
>
>
> On Fri, Sep 22, 2023 at 10:48 AM Erik fang <[email protected]> wrote:
>
> > Hi folks,
> >
> > I have a proposal to implement Spark stage resubmission to handle shuffle
> > fetch failure in Celeborn
> >
> >
> >
> https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8
> >
> > please have a look and let me know what you think
> >
> > Regards,
> > Erik
> >
>

Reply via email to