I checked RDD#getOutputDeterministicLevel and find that if an RDD's
upstream is INDETERMINATE,
then it's also INDETERMINATE.

Thanks,
Keyong Zhou

Keyong Zhou <zho...@apache.org> 于2023年11月3日周五 19:57写道:

> Hi Mridul,
>
> I still have a question. DAGScheduler#submitMissingTasks will
> only unregisterAllMapAndMergeOutput
> if the current ShuffleMapStage is Indeterminate. What if the current stage
> is determinate, but its
> upstream stage is Indeterminate, and its upstream stage is rerun?
>
> Thanks,
> Keyong Zhou
>
> Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道:
>
>> To add my response - what I described (w.r.t failing job) applies only to
>> ResultStage.
>> It walks the lineage DAG to identify all indeterminate parents to
>> rollback.
>> If there are only ShuffleMapStages in the set of stages to rollback, it
>> will simply discard their output, rollback all of them, and then retry
>> these stages (same shuffle-id, a new stage attempt)
>>
>>
>> Regards,
>> Mridul
>>
>>
>>
>> On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com>
>> wrote:
>>
>> >
>> > Good question, and ResultStage is actually special cased in spark as its
>> > output could have already been consumed (for example collect() to
>> driver,
>> > etc) - and so if it is one of the stages which needs to be rolled back,
>> the
>> > job is aborted.
>> >
>> > To illustrate, see the following:
>> > -- snip --
>> >
>> > package org.apache.spark
>> >
>> >
>> > import scala.reflect.ClassTag
>> >
>> > import org.apache.spark._
>> > import org.apache.spark.rdd.{DeterministicLevel, RDD}
>> >
>> > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends
>> RDD[E](delegate) {
>> >
>> >   override def compute(split: Partition, context: TaskContext):
>> Iterator[E] = {
>> >     delegate.compute(split, context)
>> >   }
>> >
>> >   override protected def getPartitions: Array[Partition] =
>> >     delegate.partitions
>> > }
>> >
>> > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
>> DelegatingRDD[E](delegate) {
>> >   override def getOutputDeterministicLevel: DeterministicLevel.Value =
>> DeterministicLevel.INDETERMINATE
>> > }
>> >
>> > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
>> DelegatingRDD[E](delegate) {
>> >   override def compute(split: Partition, context: TaskContext):
>> Iterator[E] = {
>> >     val tc = TaskContext.get
>> >     if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
>> tc.attemptNumber() == 0) {
>> >       // Wait for all tasks to be done, then call exit
>> >       Thread.sleep(5000)
>> >       System.exit(-1)
>> >     }
>> >     delegate.compute(split, context)
>> >   }
>> > }
>> >
>> > // Make sure test_output directory is deleted before running this.
>> > //
>> > object Test {
>> >
>> >   def main(args: Array[String]): Unit = {
>> >     val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
>> >     val sc = new SparkContext(conf)
>> >
>> >     val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000,
>> 20).map(v => (v, v)))
>> >     val resultRdd = new FailingRDD(mapperRdd.groupByKey())
>> >     resultRdd.saveAsTextFile("test_output")
>> >   }
>> > }
>> >
>> > -- snip --
>> >
>> >
>> >
>> > Here, the mapper stage has been forced to be INDETERMINATE.
>> > In the reducer stage, the first attempt to compute partition 0 will
>> wait for a bit and then exit - since the master is a local-cluster, this
>> results in FetchFailure when the second attempt of partition 0 tries to
>> fetch shuffle data.
>> > When spark tries to regenerate parent shuffle output, it sees that the
>> parent is INDETERMINATE - and so fails the entire job.with the message:
>> > "
>> > org.apache.spark.SparkException: Job aborted due to stage failure: A
>> shuffle map stage with indeterminate output was failed and retried.
>> However, Spark cannot rollback the ResultStage 1 to re-process the input
>> data, and has to fail this job. Please eliminate the indeterminacy by
>> checkpointing the RDD before repartition and try again.
>> > "
>> >
>> > This is coming from here <
>> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090>
>> - when rolling back stages, if spark determines that a ResultStage needs to
>> be rolled back due to loss of INDETERMINATE output, it will fail the job.
>> >
>> > Hope this clarifies.
>> > Regards,
>> > Mridul
>> >
>> >
>> > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> wrote:
>> >
>> >> In fact, I'm wondering if Spark will rerun the whole reduce
>> >> ShuffleMapStage
>> >> if its upstream ShuffleMapStage is INDETERMINATE and rerun.
>> >>
>> >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:
>> >>
>> >> > Thanks Erik for bringing up this question, I'm also curious about the
>> >> > answer, any feedback is appreciated.
>> >> >
>> >> > Thanks,
>> >> > Keyong Zhou
>> >> >
>> >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
>> >> >
>> >> >> Mridul,
>> >> >>
>> >> >> sure, I totally agree SPARK-25299 is a much better solution, as long
>> >> as we
>> >> >> can get it from spark community
>> >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
>> >> >> celeborn already has spark-integration code with  [spark] scope)
>> >> >>
>> >> >> I also have a question about INDETERMINATE stage recompute, and may
>> >> need
>> >> >> your help
>> >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable,
>> >> however, I
>> >> >> don't find related logic for INDETERMINATE ResultStage rerun in
>> >> >> DAGScheduler
>> >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
>> >> >> corresponding ResultStage should be entirely recomputed as well,
>> per my
>> >> >> understanding
>> >> >>
>> >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to
>> rollback
>> >> a
>> >> >> ResultStage but it was not merged
>> >> >> Do you know any context or related ticket for INDETERMINATE
>> ResultStage
>> >> >> rerun?
>> >> >>
>> >> >> Thanks in advance!
>> >> >>
>> >> >> Regards,
>> >> >> Erik
>> >> >>
>> >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <
>> mri...@gmail.com>
>> >> >> wrote:
>> >> >>
>> >> >> >
>> >> >> >
>> >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com>
>> wrote:
>> >> >> >
>> >> >> >> Hi Mridul,
>> >> >> >>
>> >> >> >> For a),
>> >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
>> >> >> >> <
>> >> >>
>> >>
>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
>> >> >> >
>> >> >> >> to decide whether the whole stage needs to be recomputed
>> >> >> >> I think we can pass the same information to Celeborn in
>> >> >> >> ShuffleManager.registerShuffle()
>> >> >> >> <
>> >> >>
>> >>
>> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39
>> >> >,
>> >> >> since
>> >> >> >> RDD in ShuffleDependency contains the RDD object
>> >> >> >> It seems Stage.isIndeterminate() is unreadable from
>> >> ShuffleDependency,
>> >> >> >> but luckily rdd is used internally
>> >> >> >>
>> >> >> >> def isIndeterminate: Boolean = {
>> >> >> >>   rdd.outputDeterministicLevel ==
>> DeterministicLevel.INDETERMINATE
>> >> >> >> }
>> >> >> >>
>> >> >> >> Relies on internal implementation is not good, but doable.
>> >> >> >> I don't expect spark RDD/Stage implementation changes frequently,
>> >> and
>> >> >> we
>> >> >> >> can discuss with Spark community for a RDD isIndeterminate API if
>> >> they
>> >> >> >> change it in the future
>> >> >> >>
>> >> >> >
>> >> >> >
>> >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed,
>> >> >> > RDD.outputDeterministicLevel is not and it is private[spark].
>> >> >> > While I dont expect changes to this, it is inherently unstable to
>> >> depend
>> >> >> > on it.
>> >> >> >
>> >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
>> >> >> > maintaining a reducer oriented view, you will need to recompute
>> all
>> >> the
>> >> >> > mappers anyway - what you might save is the subset of reducer
>> >> partitions
>> >> >> > which can be skipped if it is DETERMINATE.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >>
>> >> >> >> for c)
>> >> >> >> I also considered a similar solution in celeborn
>> >> >> >> Celeborn (LifecycleManager) can get the full picture of remaining
>> >> >> shuffle
>> >> >> >> data from previous stage attempt and reuse it in stage recompute
>> >> >> >> , and the whole process will be transparent to Spark/DagScheduler
>> >> >> >>
>> >> >> >
>> >> >> > Celeborn does not have visibility into this - and this is
>> potentially
>> >> >> > subject to invasive changes in Apache Spark as it evolves.
>> >> >> > For example, I recently merged a couple of changes which would
>> make
>> >> this
>> >> >> > different in master compared to previous versions.
>> >> >> > Until the remote shuffle service SPIP is implemented and these are
>> >> >> > abstracted out & made pluggable, it will continue to be quite
>> >> volatile.
>> >> >> >
>> >> >> > Note that the behavior for 3.5 and older is known - since Spark
>> >> versions
>> >> >> > have been released - it is the behavior in master and future
>> >> versions of
>> >> >> > Spark which is subject to change.
>> >> >> > So delivering on SPARK-25299 would future proof all remote shuffle
>> >> >> > implementations.
>> >> >> >
>> >> >> >
>> >> >> > Regards,
>> >> >> > Mridul
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >>
>> >> >> >> Per my perspective, leveraging partial stage recompute and
>> >> >> >> remaining shuffle data needs a lot of work to do in Celeborn
>> >> >> >> I prefer to implement a simple whole stage recompute first with
>> >> >> interface
>> >> >> >> defined with recomputeAll = true flag, and explore partial stage
>> >> >> recompute
>> >> >> >> in seperate ticket as future optimization
>> >> >> >> How do you think about it?
>> >> >> >>
>> >> >> >> Regards,
>> >> >> >> Erik
>> >> >> >>
>> >> >> >>
>> >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <
>> >> mri...@gmail.com>
>> >> >> >> wrote:
>> >> >> >>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <
>> >> mri...@gmail.com
>> >> >> >
>> >> >> >>> wrote:
>> >> >> >>>
>> >> >> >>>>
>> >> >> >>>> A reducer oriented view of shuffle, especially without
>> >> replication,
>> >> >> >>>> could indeed be susceptible to this issue you described (a
>> single
>> >> >> fetch
>> >> >> >>>> failure would require all mappers to need to be recomputed) -
>> >> note,
>> >> >> not
>> >> >> >>>> necessarily all reducers to be recomputed though.
>> >> >> >>>>
>> >> >> >>>> Note that I have not looked much into Celeborn specifically on
>> >> this
>> >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark
>> internals
>> >> >> :-)
>> >> >> >>>>
>> >> >> >>>> Regards,
>> >> >> >>>> Mridul
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <
>> glap...@gmail.com>
>> >> >> wrote:
>> >> >> >>>>
>> >> >> >>>>> Hello,
>> >> >> >>>>>
>> >> >> >>>>> (Sorry for sending the same message again.)
>> >> >> >>>>>
>> >> >> >>>>> From my understanding, the current implementation of Celeborn
>> >> makes
>> >> >> it
>> >> >> >>>>> hard to find out which mapper should be re-executed when a
>> >> >> partition cannot
>> >> >> >>>>> be read, and we should re-execute all the mappers in the
>> upstream
>> >> >> stage. If
>> >> >> >>>>> we can find out which mapper/partition should be re-executed,
>> the
>> >> >> current
>> >> >> >>>>> logic of stage recomputation could be (partially or totally)
>> >> reused.
>> >> >> >>>>>
>> >> >> >>>>> Regards,
>> >> >> >>>>>
>> >> >> >>>>> --- Sungwoo
>> >> >> >>>>>
>> >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
>> >> >> mri...@gmail.com>
>> >> >> >>>>> wrote:
>> >> >> >>>>>
>> >> >> >>>>>>
>> >> >> >>>>>> Hi,
>> >> >> >>>>>>
>> >> >> >>>>>>   Spark will try to minimize the recomputation cost as much
>> as
>> >> >> >>>>>> possible.
>> >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply
>> needs to
>> >> >> >>>>>> recompute the missing (mapper) partitions (which resulted in
>> >> fetch
>> >> >> >>>>>> failure). Note, this by itself could require further
>> >> recomputation
>> >> >> in the
>> >> >> >>>>>> DAG if the inputs required to comput the parent partitions
>> are
>> >> >> missing, and
>> >> >> >>>>>> so on - so it is dynamic.
>> >> >> >>>>>>
>> >> >> >>>>>> Regards,
>> >> >> >>>>>> Mridul
>> >> >> >>>>>>
>> >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
>> >> >> o...@pl.postech.ac.kr>
>> >> >> >>>>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle
>> id)
>> >> is
>> >> >> >>>>>>> going to be
>> >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle
>> >> output
>> >> >> >>>>>>> will be
>> >> >> >>>>>>> > discarded and it will be entirely recomputed (see here
>> >> >> >>>>>>> > <
>> >> >> >>>>>>>
>> >> >>
>> >>
>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > ).
>> >> >> >>>>>>>
>> >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data,
>> can we
>> >> >> find
>> >> >> >>>>>>> out
>> >> >> >>>>>>> which tasks should recompute their output? From the previous
>> >> >> >>>>>>> discussion, I
>> >> >> >>>>>>> thought this was hard (in the current implementation), and
>> we
>> >> >> should
>> >> >> >>>>>>> re-execute all tasks in the upstream stage.
>> >> >> >>>>>>>
>> >> >> >>>>>>> Thanks,
>> >> >> >>>>>>>
>> >> >> >>>>>>> --- Sungwoo
>> >> >> >>>>>>>
>> >> >> >>>>>>
>> >> >>
>> >> >
>> >>
>> >
>>
>

Reply via email to