Yeah, retaining the map output can reduce the needed tasks to be
recomputed for DETERMINATE stages when an output file is lost.
This is one important design tradeoff.

Currently Celeborn also supports MapPartition for Flink Batch, in
which case partition data is not aggregated, instead one mapper's
output is stored in one file (perhaps multiple files if split happens),
very similar to how ESS stores shuffle data. Combining MapPartition
with ReducePartition (aggregate partition data) in Celeborn the same
way how magnet does may be an interesting idea.

Thanks,
Keyong Zhou

Mridul Muralidharan <mri...@gmail.com> 于2023年10月17日周二 00:01写道:

> With push based shuffle in Apache Spark (magnet), we have both the map
> output and reducer orientated merged output preserved - with reducer
> oriented view chosen by default for reads and fallback to mapper output
> when reducer output is missing/failures. That mitigates this specific issue
> for DETERMINATE stages and only subset which need recomputation are
> regenerated.
> With magnet only smaller blocks are pushed for merged data, so effective
> replication is much lower.
>
> In our Celeborn deployment we are still testing, we will enable replication
> for functional and operational reasons - probably move replication out of
> the write path to speed it up further.
>
>
> Regards,
> Mridul
>
> On Mon, Oct 16, 2023 at 9:08 AM Keyong Zhou <zho...@apache.org> wrote:
>
> > Hi Sungwoo,
> >
> > What you are pointing out is very correct. Currently shuffle data
> > is distributed across `celeborn.master.slot.assign.maxWorkers` workers,
> > which defaults to 10000, so I believe the cascading stage rerun will
> > definitely happen.
> >
> > I think setting ` celeborn.master.slot.assign.maxWorkers` to a smaller
> > value can help, especially in relatively large clusters. Turning on
> > replication
> > can also help, but it conflicts with the purpose why we do stage rerun
> > (i.e. we
> > don't want to turn on replication for resource consumption reason).
> >
> > We didn't thought about this before, thanks for pointing that out!
> >
> > Thanks,
> > Keyong Zhou
> >
> > Sungwoo Park <o...@pl.postech.ac.kr> 于2023年10月13日周五 02:22写道:
> >
> > > I have a question on how Celeborn distributes shuffle data among
> Celeborn
> > > workers.
> > >
> > > From our observation, it seems that whenever a Celeborn worker fails or
> > > gets killed (in a small cluster of less than 25 nodes), almost every
> edge
> > > is affected. Does this mean that an edge with multiple partitions
> usually
> > > distributes its shuffle data among all Celeborn workers?
> > >
> > > If this is the case, I think stage recomputation is unnecessary and
> just
> > > re-executing the entire DAG is a better approach. Our current
> > > implementation uses the following scheme for stage recomputation:
> > >
> > > 1. If a read failure occurs for shuffleId #1 for an edge, we pick up a
> > new
> > > shuffleId #2 for the same edge.
> > > 2. The upstream stage re-executes all tasks, but writes the output to
> > > shuffleId #2.
> > > 3. Tasks in the downstream stage re-try by reading from shuffleId #2.
> > >
> > > From our experiment, whenever a Celeborn worker fails and a read
> failure
> > > occurs for an edge, the re-execution of the upstream stage usally ends
> up
> > > with another read failure because some part of its input has also been
> > > lost. As a result, all upstream stages are eventually re-executed in a
> > > cascading manner. In essence, the failure of a Celeborn worker
> > invalidates
> > > all existing shuffleIds.
> > >
> > > (This is what we observed with Hive-MR3-Celeborn, but I guess stage
> > > recomputation in Spark will have to deal with the same problem.)
> > >
> > > Thanks,
> > >
> > > --- Sungwoo
> > >
> >
>

Reply via email to