I checked RDD#getOutputDeterministicLevel and find that if an RDD's upstream is INDETERMINATE, then it's also INDETERMINATE.
Thanks, Keyong Zhou Keyong Zhou <zho...@apache.org> 于2023年11月3日周五 19:57写道: > Hi Mridul, > > I still have a question. DAGScheduler#submitMissingTasks will > only unregisterAllMapAndMergeOutput > if the current ShuffleMapStage is Indeterminate. What if the current stage > is determinate, but its > upstream stage is Indeterminate, and its upstream stage is rerun? > > Thanks, > Keyong Zhou > > Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道: > >> To add my response - what I described (w.r.t failing job) applies only to >> ResultStage. >> It walks the lineage DAG to identify all indeterminate parents to >> rollback. >> If there are only ShuffleMapStages in the set of stages to rollback, it >> will simply discard their output, rollback all of them, and then retry >> these stages (same shuffle-id, a new stage attempt) >> >> >> Regards, >> Mridul >> >> >> >> On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com> >> wrote: >> >> > >> > Good question, and ResultStage is actually special cased in spark as its >> > output could have already been consumed (for example collect() to >> driver, >> > etc) - and so if it is one of the stages which needs to be rolled back, >> the >> > job is aborted. >> > >> > To illustrate, see the following: >> > -- snip -- >> > >> > package org.apache.spark >> > >> > >> > import scala.reflect.ClassTag >> > >> > import org.apache.spark._ >> > import org.apache.spark.rdd.{DeterministicLevel, RDD} >> > >> > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends >> RDD[E](delegate) { >> > >> > override def compute(split: Partition, context: TaskContext): >> Iterator[E] = { >> > delegate.compute(split, context) >> > } >> > >> > override protected def getPartitions: Array[Partition] = >> > delegate.partitions >> > } >> > >> > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends >> DelegatingRDD[E](delegate) { >> > override def getOutputDeterministicLevel: DeterministicLevel.Value = >> DeterministicLevel.INDETERMINATE >> > } >> > >> > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends >> DelegatingRDD[E](delegate) { >> > override def compute(split: Partition, context: TaskContext): >> Iterator[E] = { >> > val tc = TaskContext.get >> > if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 && >> tc.attemptNumber() == 0) { >> > // Wait for all tasks to be done, then call exit >> > Thread.sleep(5000) >> > System.exit(-1) >> > } >> > delegate.compute(split, context) >> > } >> > } >> > >> > // Make sure test_output directory is deleted before running this. >> > // >> > object Test { >> > >> > def main(args: Array[String]): Unit = { >> > val conf = new SparkConf().setMaster("local-cluster[4,1,1024]") >> > val sc = new SparkContext(conf) >> > >> > val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000, >> 20).map(v => (v, v))) >> > val resultRdd = new FailingRDD(mapperRdd.groupByKey()) >> > resultRdd.saveAsTextFile("test_output") >> > } >> > } >> > >> > -- snip -- >> > >> > >> > >> > Here, the mapper stage has been forced to be INDETERMINATE. >> > In the reducer stage, the first attempt to compute partition 0 will >> wait for a bit and then exit - since the master is a local-cluster, this >> results in FetchFailure when the second attempt of partition 0 tries to >> fetch shuffle data. >> > When spark tries to regenerate parent shuffle output, it sees that the >> parent is INDETERMINATE - and so fails the entire job.with the message: >> > " >> > org.apache.spark.SparkException: Job aborted due to stage failure: A >> shuffle map stage with indeterminate output was failed and retried. >> However, Spark cannot rollback the ResultStage 1 to re-process the input >> data, and has to fail this job. Please eliminate the indeterminacy by >> checkpointing the RDD before repartition and try again. >> > " >> > >> > This is coming from here < >> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090> >> - when rolling back stages, if spark determines that a ResultStage needs to >> be rolled back due to loss of INDETERMINATE output, it will fail the job. >> > >> > Hope this clarifies. >> > Regards, >> > Mridul >> > >> > >> > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> wrote: >> > >> >> In fact, I'm wondering if Spark will rerun the whole reduce >> >> ShuffleMapStage >> >> if its upstream ShuffleMapStage is INDETERMINATE and rerun. >> >> >> >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道: >> >> >> >> > Thanks Erik for bringing up this question, I'm also curious about the >> >> > answer, any feedback is appreciated. >> >> > >> >> > Thanks, >> >> > Keyong Zhou >> >> > >> >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道: >> >> > >> >> >> Mridul, >> >> >> >> >> >> sure, I totally agree SPARK-25299 is a much better solution, as long >> >> as we >> >> >> can get it from spark community >> >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal, >> >> >> celeborn already has spark-integration code with [spark] scope) >> >> >> >> >> >> I also have a question about INDETERMINATE stage recompute, and may >> >> need >> >> >> your help >> >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, >> >> however, I >> >> >> don't find related logic for INDETERMINATE ResultStage rerun in >> >> >> DAGScheduler >> >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the >> >> >> corresponding ResultStage should be entirely recomputed as well, >> per my >> >> >> understanding >> >> >> >> >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to >> rollback >> >> a >> >> >> ResultStage but it was not merged >> >> >> Do you know any context or related ticket for INDETERMINATE >> ResultStage >> >> >> rerun? >> >> >> >> >> >> Thanks in advance! >> >> >> >> >> >> Regards, >> >> >> Erik >> >> >> >> >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan < >> mri...@gmail.com> >> >> >> wrote: >> >> >> >> >> >> > >> >> >> > >> >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> >> wrote: >> >> >> > >> >> >> >> Hi Mridul, >> >> >> >> >> >> >> >> For a), >> >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() >> >> >> >> < >> >> >> >> >> >> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975 >> >> >> > >> >> >> >> to decide whether the whole stage needs to be recomputed >> >> >> >> I think we can pass the same information to Celeborn in >> >> >> >> ShuffleManager.registerShuffle() >> >> >> >> < >> >> >> >> >> >> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39 >> >> >, >> >> >> since >> >> >> >> RDD in ShuffleDependency contains the RDD object >> >> >> >> It seems Stage.isIndeterminate() is unreadable from >> >> ShuffleDependency, >> >> >> >> but luckily rdd is used internally >> >> >> >> >> >> >> >> def isIndeterminate: Boolean = { >> >> >> >> rdd.outputDeterministicLevel == >> DeterministicLevel.INDETERMINATE >> >> >> >> } >> >> >> >> >> >> >> >> Relies on internal implementation is not good, but doable. >> >> >> >> I don't expect spark RDD/Stage implementation changes frequently, >> >> and >> >> >> we >> >> >> >> can discuss with Spark community for a RDD isIndeterminate API if >> >> they >> >> >> >> change it in the future >> >> >> >> >> >> >> > >> >> >> > >> >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed, >> >> >> > RDD.outputDeterministicLevel is not and it is private[spark]. >> >> >> > While I dont expect changes to this, it is inherently unstable to >> >> depend >> >> >> > on it. >> >> >> > >> >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is >> >> >> > maintaining a reducer oriented view, you will need to recompute >> all >> >> the >> >> >> > mappers anyway - what you might save is the subset of reducer >> >> partitions >> >> >> > which can be skipped if it is DETERMINATE. >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> for c) >> >> >> >> I also considered a similar solution in celeborn >> >> >> >> Celeborn (LifecycleManager) can get the full picture of remaining >> >> >> shuffle >> >> >> >> data from previous stage attempt and reuse it in stage recompute >> >> >> >> , and the whole process will be transparent to Spark/DagScheduler >> >> >> >> >> >> >> > >> >> >> > Celeborn does not have visibility into this - and this is >> potentially >> >> >> > subject to invasive changes in Apache Spark as it evolves. >> >> >> > For example, I recently merged a couple of changes which would >> make >> >> this >> >> >> > different in master compared to previous versions. >> >> >> > Until the remote shuffle service SPIP is implemented and these are >> >> >> > abstracted out & made pluggable, it will continue to be quite >> >> volatile. >> >> >> > >> >> >> > Note that the behavior for 3.5 and older is known - since Spark >> >> versions >> >> >> > have been released - it is the behavior in master and future >> >> versions of >> >> >> > Spark which is subject to change. >> >> >> > So delivering on SPARK-25299 would future proof all remote shuffle >> >> >> > implementations. >> >> >> > >> >> >> > >> >> >> > Regards, >> >> >> > Mridul >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> Per my perspective, leveraging partial stage recompute and >> >> >> >> remaining shuffle data needs a lot of work to do in Celeborn >> >> >> >> I prefer to implement a simple whole stage recompute first with >> >> >> interface >> >> >> >> defined with recomputeAll = true flag, and explore partial stage >> >> >> recompute >> >> >> >> in seperate ticket as future optimization >> >> >> >> How do you think about it? >> >> >> >> >> >> >> >> Regards, >> >> >> >> Erik >> >> >> >> >> >> >> >> >> >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan < >> >> mri...@gmail.com> >> >> >> >> wrote: >> >> >> >> >> >> >> >>> >> >> >> >>> >> >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan < >> >> mri...@gmail.com >> >> >> > >> >> >> >>> wrote: >> >> >> >>> >> >> >> >>>> >> >> >> >>>> A reducer oriented view of shuffle, especially without >> >> replication, >> >> >> >>>> could indeed be susceptible to this issue you described (a >> single >> >> >> fetch >> >> >> >>>> failure would require all mappers to need to be recomputed) - >> >> note, >> >> >> not >> >> >> >>>> necessarily all reducers to be recomputed though. >> >> >> >>>> >> >> >> >>>> Note that I have not looked much into Celeborn specifically on >> >> this >> >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark >> internals >> >> >> :-) >> >> >> >>>> >> >> >> >>>> Regards, >> >> >> >>>> Mridul >> >> >> >>>> >> >> >> >>>> >> >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park < >> glap...@gmail.com> >> >> >> wrote: >> >> >> >>>> >> >> >> >>>>> Hello, >> >> >> >>>>> >> >> >> >>>>> (Sorry for sending the same message again.) >> >> >> >>>>> >> >> >> >>>>> From my understanding, the current implementation of Celeborn >> >> makes >> >> >> it >> >> >> >>>>> hard to find out which mapper should be re-executed when a >> >> >> partition cannot >> >> >> >>>>> be read, and we should re-execute all the mappers in the >> upstream >> >> >> stage. If >> >> >> >>>>> we can find out which mapper/partition should be re-executed, >> the >> >> >> current >> >> >> >>>>> logic of stage recomputation could be (partially or totally) >> >> reused. >> >> >> >>>>> >> >> >> >>>>> Regards, >> >> >> >>>>> >> >> >> >>>>> --- Sungwoo >> >> >> >>>>> >> >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan < >> >> >> mri...@gmail.com> >> >> >> >>>>> wrote: >> >> >> >>>>> >> >> >> >>>>>> >> >> >> >>>>>> Hi, >> >> >> >>>>>> >> >> >> >>>>>> Spark will try to minimize the recomputation cost as much >> as >> >> >> >>>>>> possible. >> >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply >> needs to >> >> >> >>>>>> recompute the missing (mapper) partitions (which resulted in >> >> fetch >> >> >> >>>>>> failure). Note, this by itself could require further >> >> recomputation >> >> >> in the >> >> >> >>>>>> DAG if the inputs required to comput the parent partitions >> are >> >> >> missing, and >> >> >> >>>>>> so on - so it is dynamic. >> >> >> >>>>>> >> >> >> >>>>>> Regards, >> >> >> >>>>>> Mridul >> >> >> >>>>>> >> >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park < >> >> >> o...@pl.postech.ac.kr> >> >> >> >>>>>> wrote: >> >> >> >>>>>> >> >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle >> id) >> >> is >> >> >> >>>>>>> going to be >> >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle >> >> output >> >> >> >>>>>>> will be >> >> >> >>>>>>> > discarded and it will be entirely recomputed (see here >> >> >> >>>>>>> > < >> >> >> >>>>>>> >> >> >> >> >> >> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 >> >> >> >>>>>>> > >> >> >> >>>>>>> > ). >> >> >> >>>>>>> >> >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data, >> can we >> >> >> find >> >> >> >>>>>>> out >> >> >> >>>>>>> which tasks should recompute their output? From the previous >> >> >> >>>>>>> discussion, I >> >> >> >>>>>>> thought this was hard (in the current implementation), and >> we >> >> >> should >> >> >> >>>>>>> re-execute all tasks in the upstream stage. >> >> >> >>>>>>> >> >> >> >>>>>>> Thanks, >> >> >> >>>>>>> >> >> >> >>>>>>> --- Sungwoo >> >> >> >>>>>>> >> >> >> >>>>>> >> >> >> >> >> > >> >> >> > >> >