Thanks Erik for bringing up this question, I'm also curious about the
answer, any feedback is appreciated.

Thanks,
Keyong Zhou

Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:

> Mridul,
>
> sure, I totally agree SPARK-25299 is a much better solution, as long as we
> can get it from spark community
> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
> celeborn already has spark-integration code with  [spark] scope)
>
> I also have a question about INDETERMINATE stage recompute, and may need
> your help
> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I
> don't find related logic for INDETERMINATE ResultStage rerun in
> DAGScheduler
> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
> corresponding ResultStage should be entirely recomputed as well, per my
> understanding
>
> I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a
> ResultStage but it was not merged
> Do you know any context or related ticket for INDETERMINATE ResultStage
> rerun?
>
> Thanks in advance!
>
> Regards,
> Erik
>
> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
> >
> >
> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote:
> >
> >> Hi Mridul,
> >>
> >> For a),
> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> >> <
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
> >
> >> to decide whether the whole stage needs to be recomputed
> >> I think we can pass the same information to Celeborn in
> >> ShuffleManager.registerShuffle()
> >> <
> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>,
> since
> >> RDD in ShuffleDependency contains the RDD object
> >> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency,
> >> but luckily rdd is used internally
> >>
> >> def isIndeterminate: Boolean = {
> >>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
> >> }
> >>
> >> Relies on internal implementation is not good, but doable.
> >> I don't expect spark RDD/Stage implementation changes frequently, and we
> >> can discuss with Spark community for a RDD isIndeterminate API if they
> >> change it in the future
> >>
> >
> >
> > Only RDD.getOutputDeterministicLevel is publicly exposed,
> > RDD.outputDeterministicLevel is not and it is private[spark].
> > While I dont expect changes to this, it is inherently unstable to depend
> > on it.
> >
> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
> > maintaining a reducer oriented view, you will need to recompute all the
> > mappers anyway - what you might save is the subset of reducer partitions
> > which can be skipped if it is DETERMINATE.
> >
> >
> >
> >
> >>
> >> for c)
> >> I also considered a similar solution in celeborn
> >> Celeborn (LifecycleManager) can get the full picture of remaining
> shuffle
> >> data from previous stage attempt and reuse it in stage recompute
> >> , and the whole process will be transparent to Spark/DagScheduler
> >>
> >
> > Celeborn does not have visibility into this - and this is potentially
> > subject to invasive changes in Apache Spark as it evolves.
> > For example, I recently merged a couple of changes which would make this
> > different in master compared to previous versions.
> > Until the remote shuffle service SPIP is implemented and these are
> > abstracted out & made pluggable, it will continue to be quite volatile.
> >
> > Note that the behavior for 3.5 and older is known - since Spark versions
> > have been released - it is the behavior in master and future versions of
> > Spark which is subject to change.
> > So delivering on SPARK-25299 would future proof all remote shuffle
> > implementations.
> >
> >
> > Regards,
> > Mridul
> >
> >
> >
> >>
> >> Per my perspective, leveraging partial stage recompute and
> >> remaining shuffle data needs a lot of work to do in Celeborn
> >> I prefer to implement a simple whole stage recompute first with
> interface
> >> defined with recomputeAll = true flag, and explore partial stage
> recompute
> >> in seperate ticket as future optimization
> >> How do you think about it?
> >>
> >> Regards,
> >> Erik
> >>
> >>
> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com>
> >> wrote:
> >>
> >>>
> >>>
> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com>
> >>> wrote:
> >>>
> >>>>
> >>>> A reducer oriented view of shuffle, especially without replication,
> >>>> could indeed be susceptible to this issue you described (a single
> fetch
> >>>> failure would require all mappers to need to be recomputed) - note,
> not
> >>>> necessarily all reducers to be recomputed though.
> >>>>
> >>>> Note that I have not looked much into Celeborn specifically on this
> >>>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
> >>>>
> >>>> Regards,
> >>>> Mridul
> >>>>
> >>>>
> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com>
> wrote:
> >>>>
> >>>>> Hello,
> >>>>>
> >>>>> (Sorry for sending the same message again.)
> >>>>>
> >>>>> From my understanding, the current implementation of Celeborn makes
> it
> >>>>> hard to find out which mapper should be re-executed when a partition
> cannot
> >>>>> be read, and we should re-execute all the mappers in the upstream
> stage. If
> >>>>> we can find out which mapper/partition should be re-executed, the
> current
> >>>>> logic of stage recomputation could be (partially or totally) reused.
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> --- Sungwoo
> >>>>>
> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
> mri...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>>   Spark will try to minimize the recomputation cost as much as
> >>>>>> possible.
> >>>>>> For example, if parent stage was DETERMINATE, it simply needs to
> >>>>>> recompute the missing (mapper) partitions (which resulted in fetch
> >>>>>> failure). Note, this by itself could require further recomputation
> in the
> >>>>>> DAG if the inputs required to comput the parent partitions are
> missing, and
> >>>>>> so on - so it is dynamic.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Mridul
> >>>>>>
> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is
> >>>>>>> going to be
> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output
> >>>>>>> will be
> >>>>>>> > discarded and it will be entirely recomputed (see here
> >>>>>>> > <
> >>>>>>>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >>>>>>> >
> >>>>>>> > ).
> >>>>>>>
> >>>>>>> If a reducer (in a downstream stage) fails to read data, can we
> find
> >>>>>>> out
> >>>>>>> which tasks should recompute their output? From the previous
> >>>>>>> discussion, I
> >>>>>>> thought this was hard (in the current implementation), and we
> should
> >>>>>>> re-execute all tasks in the upstream stage.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> --- Sungwoo
> >>>>>>>
> >>>>>>
>

Reply via email to