Hello,
As we are developing MR3 extension for Celeborn, I would like to add my
comments on stage re-run in the context of using Celeborn for MR3. I don't
know the internal details of Spark stage re-run very well, so my apology
if my comments are irrelevant to the proposal in the design document.
For Celeborn-MR3, we only need the following two features:
1. When mapper out is lost and read errors occur, CelebornIOException from
ShuffleClientImpl includes the task index of the mapper (or a set of task
indexes) whose output has been lost.
2. ShuffleClientImpl notifies LifeCycleManager so that
ShuffleClient.mapperEnd(shuffleId, mapper_task_index, ...) can be called
again. In other words, LifeCycleManager markes shuffleId from 'complete'
back to 'incomplete'.
Then, the task-rexecution mechanism of MR3 can take care of the rest, by
re-executing the mapper and calling ShuffleClient.mapperEnd() again.
From the proposal (if I understood it correctly), however, it seems that 1
is not easy to implement in the current architecture of Celeborn (???):
Celeborn doesn't know which mapper tasks need to be recomputed, unless the
mapping of parititionId -> List<mapId> is recorded and reported to
LifeCycleManager at committing time.
By the way, we have finished the initial implementation of
Hive-MR3-Celeborn, and it works very reliably when tested with TPC-DS 10TB
and the performance is also good. A release candidate is currently being
tested in production by a third parity. It could take a bit of time to
learn to use Hive-MR3-Celeborn, but Hive-MR3-Celeborn could be another way
to run stress tests on Celeborn. For example, we produced the EOFException
error when running stress tests by using speculative execution a lot and
intentionally giving heavy memory pressure. (We have quick start guides
for Hadoop, K8s, standalone mode, so it should take no more than a couple
of hours to learn to run Hive-MR3-Celeborn.) If you are interested, please
let me know. Thank you.
Best,
-- Sungwoo
On Fri, 22 Sep 2023, Erik fang wrote:
Hi folks,
I have a proposal to implement Spark stage resubmission to handle shuffle
fetch failure in Celeborn
https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8
please have a look and let me know what you think
Regards,
Erik