Hi Mridul,

For a),
DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
<https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975>
to decide whether the whole stage needs to be recomputed
I think we can pass the same information to Celeborn in
ShuffleManager.registerShuffle()
<https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>,
since
RDD in ShuffleDependency contains the RDD object
It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but
luckily rdd is used internally

def isIndeterminate: Boolean = {
  rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

Relies on internal implementation is not good, but doable.
I don't expect spark RDD/Stage implementation changes frequently, and we
can discuss with Spark community for a RDD isIndeterminate API if they
change it in the future

for c)
I also considered a similar solution in celeborn
Celeborn (LifecycleManager) can get the full picture of remaining shuffle
data from previous stage attempt and reuse it in stage recompute
, and the whole process will be transparent to Spark/DagScheduler

Per my perspective, leveraging partial stage recompute and
remaining shuffle data needs a lot of work to do in Celeborn
I prefer to implement a simple whole stage recompute first with interface
defined with recomputeAll = true flag, and explore partial stage recompute
in seperate ticket as future optimization
How do you think about it?

Regards,
Erik


On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com>
wrote:

>
>
> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
>>
>> A reducer oriented view of shuffle, especially without replication, could
>> indeed be susceptible to this issue you described (a single fetch failure
>> would require all mappers to need to be recomputed) - note, not necessarily
>> all reducers to be recomputed though.
>>
>> Note that I have not looked much into Celeborn specifically on this
>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
>>
>> Regards,
>> Mridul
>>
>>
>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> (Sorry for sending the same message again.)
>>>
>>> From my understanding, the current implementation of Celeborn makes it
>>> hard to find out which mapper should be re-executed when a partition cannot
>>> be read, and we should re-execute all the mappers in the upstream stage. If
>>> we can find out which mapper/partition should be re-executed, the current
>>> logic of stage recomputation could be (partially or totally) reused.
>>>
>>> Regards,
>>>
>>> --- Sungwoo
>>>
>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <mri...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>>   Spark will try to minimize the recomputation cost as much as possible.
>>>> For example, if parent stage was DETERMINATE, it simply needs to
>>>> recompute the missing (mapper) partitions (which resulted in fetch
>>>> failure). Note, this by itself could require further recomputation in the
>>>> DAG if the inputs required to comput the parent partitions are missing, and
>>>> so on - so it is dynamic.
>>>>
>>>> Regards,
>>>> Mridul
>>>>
>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr>
>>>> wrote:
>>>>
>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is going
>>>>> to be
>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output will
>>>>> be
>>>>> > discarded and it will be entirely recomputed (see here
>>>>> > <
>>>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
>>>>> >
>>>>> > ).
>>>>>
>>>>> If a reducer (in a downstream stage) fails to read data, can we find
>>>>> out
>>>>> which tasks should recompute their output? From the previous
>>>>> discussion, I
>>>>> thought this was hard (in the current implementation), and we should
>>>>> re-execute all tasks in the upstream stage.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> --- Sungwoo
>>>>>
>>>>

Reply via email to