Hi Mridul,

I still have a question. DAGScheduler#submitMissingTasks will
only unregisterAllMapAndMergeOutput
if the current ShuffleMapStage is Indeterminate. What if the current stage
is determinate, but its
upstream stage is Indeterminate, and its upstream stage is rerun?

Thanks,
Keyong Zhou

Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道:

> To add my response - what I described (w.r.t failing job) applies only to
> ResultStage.
> It walks the lineage DAG to identify all indeterminate parents to rollback.
> If there are only ShuffleMapStages in the set of stages to rollback, it
> will simply discard their output, rollback all of them, and then retry
> these stages (same shuffle-id, a new stage attempt)
>
>
> Regards,
> Mridul
>
>
>
> On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
> >
> > Good question, and ResultStage is actually special cased in spark as its
> > output could have already been consumed (for example collect() to driver,
> > etc) - and so if it is one of the stages which needs to be rolled back,
> the
> > job is aborted.
> >
> > To illustrate, see the following:
> > -- snip --
> >
> > package org.apache.spark
> >
> >
> > import scala.reflect.ClassTag
> >
> > import org.apache.spark._
> > import org.apache.spark.rdd.{DeterministicLevel, RDD}
> >
> > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends
> RDD[E](delegate) {
> >
> >   override def compute(split: Partition, context: TaskContext):
> Iterator[E] = {
> >     delegate.compute(split, context)
> >   }
> >
> >   override protected def getPartitions: Array[Partition] =
> >     delegate.partitions
> > }
> >
> > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
> DelegatingRDD[E](delegate) {
> >   override def getOutputDeterministicLevel: DeterministicLevel.Value =
> DeterministicLevel.INDETERMINATE
> > }
> >
> > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
> DelegatingRDD[E](delegate) {
> >   override def compute(split: Partition, context: TaskContext):
> Iterator[E] = {
> >     val tc = TaskContext.get
> >     if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
> tc.attemptNumber() == 0) {
> >       // Wait for all tasks to be done, then call exit
> >       Thread.sleep(5000)
> >       System.exit(-1)
> >     }
> >     delegate.compute(split, context)
> >   }
> > }
> >
> > // Make sure test_output directory is deleted before running this.
> > //
> > object Test {
> >
> >   def main(args: Array[String]): Unit = {
> >     val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
> >     val sc = new SparkContext(conf)
> >
> >     val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000,
> 20).map(v => (v, v)))
> >     val resultRdd = new FailingRDD(mapperRdd.groupByKey())
> >     resultRdd.saveAsTextFile("test_output")
> >   }
> > }
> >
> > -- snip --
> >
> >
> >
> > Here, the mapper stage has been forced to be INDETERMINATE.
> > In the reducer stage, the first attempt to compute partition 0 will wait
> for a bit and then exit - since the master is a local-cluster, this results
> in FetchFailure when the second attempt of partition 0 tries to fetch
> shuffle data.
> > When spark tries to regenerate parent shuffle output, it sees that the
> parent is INDETERMINATE - and so fails the entire job.with the message:
> > "
> > org.apache.spark.SparkException: Job aborted due to stage failure: A
> shuffle map stage with indeterminate output was failed and retried.
> However, Spark cannot rollback the ResultStage 1 to re-process the input
> data, and has to fail this job. Please eliminate the indeterminacy by
> checkpointing the RDD before repartition and try again.
> > "
> >
> > This is coming from here <
> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090>
> - when rolling back stages, if spark determines that a ResultStage needs to
> be rolled back due to loss of INDETERMINATE output, it will fail the job.
> >
> > Hope this clarifies.
> > Regards,
> > Mridul
> >
> >
> > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> wrote:
> >
> >> In fact, I'm wondering if Spark will rerun the whole reduce
> >> ShuffleMapStage
> >> if its upstream ShuffleMapStage is INDETERMINATE and rerun.
> >>
> >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:
> >>
> >> > Thanks Erik for bringing up this question, I'm also curious about the
> >> > answer, any feedback is appreciated.
> >> >
> >> > Thanks,
> >> > Keyong Zhou
> >> >
> >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
> >> >
> >> >> Mridul,
> >> >>
> >> >> sure, I totally agree SPARK-25299 is a much better solution, as long
> >> as we
> >> >> can get it from spark community
> >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
> >> >> celeborn already has spark-integration code with  [spark] scope)
> >> >>
> >> >> I also have a question about INDETERMINATE stage recompute, and may
> >> need
> >> >> your help
> >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable,
> >> however, I
> >> >> don't find related logic for INDETERMINATE ResultStage rerun in
> >> >> DAGScheduler
> >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
> >> >> corresponding ResultStage should be entirely recomputed as well, per
> my
> >> >> understanding
> >> >>
> >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to
> rollback
> >> a
> >> >> ResultStage but it was not merged
> >> >> Do you know any context or related ticket for INDETERMINATE
> ResultStage
> >> >> rerun?
> >> >>
> >> >> Thanks in advance!
> >> >>
> >> >> Regards,
> >> >> Erik
> >> >>
> >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <
> mri...@gmail.com>
> >> >> wrote:
> >> >>
> >> >> >
> >> >> >
> >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com>
> wrote:
> >> >> >
> >> >> >> Hi Mridul,
> >> >> >>
> >> >> >> For a),
> >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> >> >> >> <
> >> >>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
> >> >> >
> >> >> >> to decide whether the whole stage needs to be recomputed
> >> >> >> I think we can pass the same information to Celeborn in
> >> >> >> ShuffleManager.registerShuffle()
> >> >> >> <
> >> >>
> >>
> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39
> >> >,
> >> >> since
> >> >> >> RDD in ShuffleDependency contains the RDD object
> >> >> >> It seems Stage.isIndeterminate() is unreadable from
> >> ShuffleDependency,
> >> >> >> but luckily rdd is used internally
> >> >> >>
> >> >> >> def isIndeterminate: Boolean = {
> >> >> >>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
> >> >> >> }
> >> >> >>
> >> >> >> Relies on internal implementation is not good, but doable.
> >> >> >> I don't expect spark RDD/Stage implementation changes frequently,
> >> and
> >> >> we
> >> >> >> can discuss with Spark community for a RDD isIndeterminate API if
> >> they
> >> >> >> change it in the future
> >> >> >>
> >> >> >
> >> >> >
> >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed,
> >> >> > RDD.outputDeterministicLevel is not and it is private[spark].
> >> >> > While I dont expect changes to this, it is inherently unstable to
> >> depend
> >> >> > on it.
> >> >> >
> >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
> >> >> > maintaining a reducer oriented view, you will need to recompute all
> >> the
> >> >> > mappers anyway - what you might save is the subset of reducer
> >> partitions
> >> >> > which can be skipped if it is DETERMINATE.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >>
> >> >> >> for c)
> >> >> >> I also considered a similar solution in celeborn
> >> >> >> Celeborn (LifecycleManager) can get the full picture of remaining
> >> >> shuffle
> >> >> >> data from previous stage attempt and reuse it in stage recompute
> >> >> >> , and the whole process will be transparent to Spark/DagScheduler
> >> >> >>
> >> >> >
> >> >> > Celeborn does not have visibility into this - and this is
> potentially
> >> >> > subject to invasive changes in Apache Spark as it evolves.
> >> >> > For example, I recently merged a couple of changes which would make
> >> this
> >> >> > different in master compared to previous versions.
> >> >> > Until the remote shuffle service SPIP is implemented and these are
> >> >> > abstracted out & made pluggable, it will continue to be quite
> >> volatile.
> >> >> >
> >> >> > Note that the behavior for 3.5 and older is known - since Spark
> >> versions
> >> >> > have been released - it is the behavior in master and future
> >> versions of
> >> >> > Spark which is subject to change.
> >> >> > So delivering on SPARK-25299 would future proof all remote shuffle
> >> >> > implementations.
> >> >> >
> >> >> >
> >> >> > Regards,
> >> >> > Mridul
> >> >> >
> >> >> >
> >> >> >
> >> >> >>
> >> >> >> Per my perspective, leveraging partial stage recompute and
> >> >> >> remaining shuffle data needs a lot of work to do in Celeborn
> >> >> >> I prefer to implement a simple whole stage recompute first with
> >> >> interface
> >> >> >> defined with recomputeAll = true flag, and explore partial stage
> >> >> recompute
> >> >> >> in seperate ticket as future optimization
> >> >> >> How do you think about it?
> >> >> >>
> >> >> >> Regards,
> >> >> >> Erik
> >> >> >>
> >> >> >>
> >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <
> >> mri...@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >>>
> >> >> >>>
> >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <
> >> mri...@gmail.com
> >> >> >
> >> >> >>> wrote:
> >> >> >>>
> >> >> >>>>
> >> >> >>>> A reducer oriented view of shuffle, especially without
> >> replication,
> >> >> >>>> could indeed be susceptible to this issue you described (a
> single
> >> >> fetch
> >> >> >>>> failure would require all mappers to need to be recomputed) -
> >> note,
> >> >> not
> >> >> >>>> necessarily all reducers to be recomputed though.
> >> >> >>>>
> >> >> >>>> Note that I have not looked much into Celeborn specifically on
> >> this
> >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark
> internals
> >> >> :-)
> >> >> >>>>
> >> >> >>>> Regards,
> >> >> >>>> Mridul
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com
> >
> >> >> wrote:
> >> >> >>>>
> >> >> >>>>> Hello,
> >> >> >>>>>
> >> >> >>>>> (Sorry for sending the same message again.)
> >> >> >>>>>
> >> >> >>>>> From my understanding, the current implementation of Celeborn
> >> makes
> >> >> it
> >> >> >>>>> hard to find out which mapper should be re-executed when a
> >> >> partition cannot
> >> >> >>>>> be read, and we should re-execute all the mappers in the
> upstream
> >> >> stage. If
> >> >> >>>>> we can find out which mapper/partition should be re-executed,
> the
> >> >> current
> >> >> >>>>> logic of stage recomputation could be (partially or totally)
> >> reused.
> >> >> >>>>>
> >> >> >>>>> Regards,
> >> >> >>>>>
> >> >> >>>>> --- Sungwoo
> >> >> >>>>>
> >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
> >> >> mri...@gmail.com>
> >> >> >>>>> wrote:
> >> >> >>>>>
> >> >> >>>>>>
> >> >> >>>>>> Hi,
> >> >> >>>>>>
> >> >> >>>>>>   Spark will try to minimize the recomputation cost as much as
> >> >> >>>>>> possible.
> >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply needs
> to
> >> >> >>>>>> recompute the missing (mapper) partitions (which resulted in
> >> fetch
> >> >> >>>>>> failure). Note, this by itself could require further
> >> recomputation
> >> >> in the
> >> >> >>>>>> DAG if the inputs required to comput the parent partitions are
> >> >> missing, and
> >> >> >>>>>> so on - so it is dynamic.
> >> >> >>>>>>
> >> >> >>>>>> Regards,
> >> >> >>>>>> Mridul
> >> >> >>>>>>
> >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
> >> >> o...@pl.postech.ac.kr>
> >> >> >>>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id)
> >> is
> >> >> >>>>>>> going to be
> >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle
> >> output
> >> >> >>>>>>> will be
> >> >> >>>>>>> > discarded and it will be entirely recomputed (see here
> >> >> >>>>>>> > <
> >> >> >>>>>>>
> >> >>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >> >> >>>>>>> >
> >> >> >>>>>>> > ).
> >> >> >>>>>>>
> >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data, can
> we
> >> >> find
> >> >> >>>>>>> out
> >> >> >>>>>>> which tasks should recompute their output? From the previous
> >> >> >>>>>>> discussion, I
> >> >> >>>>>>> thought this was hard (in the current implementation), and we
> >> >> should
> >> >> >>>>>>> re-execute all tasks in the upstream stage.
> >> >> >>>>>>>
> >> >> >>>>>>> Thanks,
> >> >> >>>>>>>
> >> >> >>>>>>> --- Sungwoo
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >>
> >> >
> >>
> >
>

Reply via email to