Thanks Erik for bringing up this question, I'm also curious about the answer, any feedback is appreciated.
Thanks, Keyong Zhou Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道: > Mridul, > > sure, I totally agree SPARK-25299 is a much better solution, as long as we > can get it from spark community > (btw, private[spark] of RDD.outputDeterministicLevel is no big deal, > celeborn already has spark-integration code with [spark] scope) > > I also have a question about INDETERMINATE stage recompute, and may need > your help > The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I > don't find related logic for INDETERMINATE ResultStage rerun in > DAGScheduler > If INDETERMINATE ShuffleMapStage got entirely recomputed, the > corresponding ResultStage should be entirely recomputed as well, per my > understanding > > I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a > ResultStage but it was not merged > Do you know any context or related ticket for INDETERMINATE ResultStage > rerun? > > Thanks in advance! > > Regards, > Erik > > On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <mri...@gmail.com> > wrote: > > > > > > > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote: > > > >> Hi Mridul, > >> > >> For a), > >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() > >> < > https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975 > > > >> to decide whether the whole stage needs to be recomputed > >> I think we can pass the same information to Celeborn in > >> ShuffleManager.registerShuffle() > >> < > https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>, > since > >> RDD in ShuffleDependency contains the RDD object > >> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, > >> but luckily rdd is used internally > >> > >> def isIndeterminate: Boolean = { > >> rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE > >> } > >> > >> Relies on internal implementation is not good, but doable. > >> I don't expect spark RDD/Stage implementation changes frequently, and we > >> can discuss with Spark community for a RDD isIndeterminate API if they > >> change it in the future > >> > > > > > > Only RDD.getOutputDeterministicLevel is publicly exposed, > > RDD.outputDeterministicLevel is not and it is private[spark]. > > While I dont expect changes to this, it is inherently unstable to depend > > on it. > > > > Btw, please see the discussion with Sungwoo Park, if Celeborn is > > maintaining a reducer oriented view, you will need to recompute all the > > mappers anyway - what you might save is the subset of reducer partitions > > which can be skipped if it is DETERMINATE. > > > > > > > > > >> > >> for c) > >> I also considered a similar solution in celeborn > >> Celeborn (LifecycleManager) can get the full picture of remaining > shuffle > >> data from previous stage attempt and reuse it in stage recompute > >> , and the whole process will be transparent to Spark/DagScheduler > >> > > > > Celeborn does not have visibility into this - and this is potentially > > subject to invasive changes in Apache Spark as it evolves. > > For example, I recently merged a couple of changes which would make this > > different in master compared to previous versions. > > Until the remote shuffle service SPIP is implemented and these are > > abstracted out & made pluggable, it will continue to be quite volatile. > > > > Note that the behavior for 3.5 and older is known - since Spark versions > > have been released - it is the behavior in master and future versions of > > Spark which is subject to change. > > So delivering on SPARK-25299 would future proof all remote shuffle > > implementations. > > > > > > Regards, > > Mridul > > > > > > > >> > >> Per my perspective, leveraging partial stage recompute and > >> remaining shuffle data needs a lot of work to do in Celeborn > >> I prefer to implement a simple whole stage recompute first with > interface > >> defined with recomputeAll = true flag, and explore partial stage > recompute > >> in seperate ticket as future optimization > >> How do you think about it? > >> > >> Regards, > >> Erik > >> > >> > >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com> > >> wrote: > >> > >>> > >>> > >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com> > >>> wrote: > >>> > >>>> > >>>> A reducer oriented view of shuffle, especially without replication, > >>>> could indeed be susceptible to this issue you described (a single > fetch > >>>> failure would require all mappers to need to be recomputed) - note, > not > >>>> necessarily all reducers to be recomputed though. > >>>> > >>>> Note that I have not looked much into Celeborn specifically on this > >>>> aspect yet, so my comments are *fairly* centric to Spark internals :-) > >>>> > >>>> Regards, > >>>> Mridul > >>>> > >>>> > >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> > wrote: > >>>> > >>>>> Hello, > >>>>> > >>>>> (Sorry for sending the same message again.) > >>>>> > >>>>> From my understanding, the current implementation of Celeborn makes > it > >>>>> hard to find out which mapper should be re-executed when a partition > cannot > >>>>> be read, and we should re-execute all the mappers in the upstream > stage. If > >>>>> we can find out which mapper/partition should be re-executed, the > current > >>>>> logic of stage recomputation could be (partially or totally) reused. > >>>>> > >>>>> Regards, > >>>>> > >>>>> --- Sungwoo > >>>>> > >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan < > mri...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> Spark will try to minimize the recomputation cost as much as > >>>>>> possible. > >>>>>> For example, if parent stage was DETERMINATE, it simply needs to > >>>>>> recompute the missing (mapper) partitions (which resulted in fetch > >>>>>> failure). Note, this by itself could require further recomputation > in the > >>>>>> DAG if the inputs required to comput the parent partitions are > missing, and > >>>>>> so on - so it is dynamic. > >>>>>> > >>>>>> Regards, > >>>>>> Mridul > >>>>>> > >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr > > > >>>>>> wrote: > >>>>>> > >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is > >>>>>>> going to be > >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output > >>>>>>> will be > >>>>>>> > discarded and it will be entirely recomputed (see here > >>>>>>> > < > >>>>>>> > https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 > >>>>>>> > > >>>>>>> > ). > >>>>>>> > >>>>>>> If a reducer (in a downstream stage) fails to read data, can we > find > >>>>>>> out > >>>>>>> which tasks should recompute their output? From the previous > >>>>>>> discussion, I > >>>>>>> thought this was hard (in the current implementation), and we > should > >>>>>>> re-execute all tasks in the upstream stage. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> --- Sungwoo > >>>>>>> > >>>>>> >