Hi Sungwoo,

Glad to know about your progress! For your questions,

1. In Celeborn's default implementation, ShuffleClient is a singleton in
the Executor and Driver process, I suggest to follow this practice.
    It's recommended to call ShuffleClient.cleanup(int shuffleId, int
mapId, int attemptId) after each writer finishes because it cleans up
    the PushState related to the specific map attempt.

2. ShuffleClient.shutdown should be called inside ShuffleManager.stop.
Better to call it in both Executor and Driver. If Executor
    is to exit, it's fine not to call this method.

Thanks,
Keyong Zhou

<[email protected]> 于2023年7月12日周三 21:19写道:

> Hi Keyong,
>
> Thanks for your quick reply. We thought that Celeborn API was clean and
> very intuitive, and have not encountered serious problems yet for getting
> our system up and running. We are not sure about just a few points that
> are not immediately obvious from Celeborn API (e.g., whether or not
> reducers should wait until the completion of source mappers).
>
> I have a few more questions on when to create/destroy rssShuffleClient and
> will appreciate it very much if you could clarify these points.
>
> 1. When to create rssShuffleClient and when to call
> rssShuffleClient.cleanup()?
>
> In our implementation, a worker (similar to Executors for Spark) creates a
> new rssShuffleClient for each reader/writer, rather than reusing a common
> rssShuffleClient for all readers/writers. Is this the right way to create
> rssShuffleClient?
>
> Then, rssShuffleClient.cleanup() should be called after a reader/writer is
> finished?
>
> 2. When to call rssShuffleClient.shutdown()?
>
> Is it enough to call rssShuffleClient.shutdown() only once inside the
> master (similar to Driver for Spark)?
>
> Thanks,
>
> --- Sungwoo
>
> On Wed, 12 Jul 2023, Keyong Zhou wrote:
>
> > Hi Sungwoo,
> >
> > Thanks for your effort to integrating Celeborn into MR3!
> >
> > For your question, currently a reducer does wait until the completion of
> > all mappers
> > before starting to fetch shuffle data.
> >
> > Briefly speaking, Celeborn client contains two modules:
> > 1. ShuffleClient for push/fetch data, mainly used on Executors for Spark
> > and TaskManager for Flink.
> > 2. LifecycleManager for communicating with Celeborn cluster and managing
> > application-level shuffle meta,
> >    mainly used in Driver for Spark or JobMaster for Flink.
> >
> > Following are the main steps for a shuffle stage:
> > 1. LifecycleManager sends RequestSlots to Master to request slots for the
> > current shuffle;
> > 2. Master allocates slots among workers for the shuffle and
> > returns RequestSlotsResponse;
> > 3. LifecycleManager sends ReserveSlots to workers; workers do
> > initialization;
> > 4. ShuffleClient pushes data to workers;
> > 5. When map task ends, ShuffleClient sends MapperEnd to LifecycleManager;
> > 6. When all map tasks ended, LifecycleManager sends CommitFiles to
> workers;
> > 7. When CommitFiles succeeds, reducer tasks can read data from workers.
> >
> > We have to admit that although currently Celeborn supports both Flink and
> > Spark based on the same API, the
> > developer API is not that much clean. It will be very helpful if you send
> > PRs to improve Celeborn during your
> > integration with MR3.
> >
> > Thanks,
> > Keyong Zhou
> >
> >
> > <[email protected]> 于2023年7月12日周三 14:53?道:
> >
> >> Hi Team,
> >>
> >> We are currently implementing a Celeborn client for our application
> >> (called MR3 which is similar to Tez), and have a question on the
> internals
> >> of Celeborn.
> >>
> >> The question is whether a reducer should wait until the completion of
> all
> >> mappers before starting to fetch mapper output. From the Celeborn API,
> it
> >> seems like there is no need to wait until the completion of all mappers.
> >> In other words, after a certain mapper finishes writing all its output,
> a
> >> reducer can fetch the corresponding output from the mapper, regardless
> of
> >> the status of other mappers.
> >>
> >> On the other hand, we suspect that trying to fetch the output of a
> mapper
> >> before the completion of other mappers occasionally triggers Premature
> EOF
> >> Exception.
> >>
> >> Any comment on this problem will be appreciated very much.
> >>
> >> Thanks,
> >>
> >> --- Sungwoo Park
> >>
> >>
> >

Reply via email to