Hi Keyong,

Thanks for your quick reply. We thought that Celeborn API was clean and very intuitive, and have not encountered serious problems yet for getting our system up and running. We are not sure about just a few points that are not immediately obvious from Celeborn API (e.g., whether or not reducers should wait until the completion of source mappers).

I have a few more questions on when to create/destroy rssShuffleClient and will appreciate it very much if you could clarify these points.

1. When to create rssShuffleClient and when to call rssShuffleClient.cleanup()?

In our implementation, a worker (similar to Executors for Spark) creates a new rssShuffleClient for each reader/writer, rather than reusing a common rssShuffleClient for all readers/writers. Is this the right way to create rssShuffleClient?

Then, rssShuffleClient.cleanup() should be called after a reader/writer is finished?

2. When to call rssShuffleClient.shutdown()?

Is it enough to call rssShuffleClient.shutdown() only once inside the master (similar to Driver for Spark)?

Thanks,

--- Sungwoo

On Wed, 12 Jul 2023, Keyong Zhou wrote:

Hi Sungwoo,

Thanks for your effort to integrating Celeborn into MR3!

For your question, currently a reducer does wait until the completion of
all mappers
before starting to fetch shuffle data.

Briefly speaking, Celeborn client contains two modules:
1. ShuffleClient for push/fetch data, mainly used on Executors for Spark
and TaskManager for Flink.
2. LifecycleManager for communicating with Celeborn cluster and managing
application-level shuffle meta,
   mainly used in Driver for Spark or JobMaster for Flink.

Following are the main steps for a shuffle stage:
1. LifecycleManager sends RequestSlots to Master to request slots for the
current shuffle;
2. Master allocates slots among workers for the shuffle and
returns RequestSlotsResponse;
3. LifecycleManager sends ReserveSlots to workers; workers do
initialization;
4. ShuffleClient pushes data to workers;
5. When map task ends, ShuffleClient sends MapperEnd to LifecycleManager;
6. When all map tasks ended, LifecycleManager sends CommitFiles to workers;
7. When CommitFiles succeeds, reducer tasks can read data from workers.

We have to admit that although currently Celeborn supports both Flink and
Spark based on the same API, the
developer API is not that much clean. It will be very helpful if you send
PRs to improve Celeborn during your
integration with MR3.

Thanks,
Keyong Zhou


<[email protected]> 于2023年7月12日周三 14:53?道:

Hi Team,

We are currently implementing a Celeborn client for our application
(called MR3 which is similar to Tez), and have a question on the internals
of Celeborn.

The question is whether a reducer should wait until the completion of all
mappers before starting to fetch mapper output. From the Celeborn API, it
seems like there is no need to wait until the completion of all mappers.
In other words, after a certain mapper finishes writing all its output, a
reducer can fetch the corresponding output from the mapper, regardless of
the status of other mappers.

On the other hand, we suspect that trying to fetch the output of a mapper
before the completion of other mappers occasionally triggers Premature EOF
Exception.

Any comment on this problem will be appreciated very much.

Thanks,

--- Sungwoo Park


Reply via email to