I have a question on how Celeborn distributes shuffle data among Celeborn workers.

From our observation, it seems that whenever a Celeborn worker fails or
gets killed (in a small cluster of less than 25 nodes), almost every edge is affected. Does this mean that an edge with multiple partitions usually distributes its shuffle data among all Celeborn workers?

If this is the case, I think stage recomputation is unnecessary and just re-executing the entire DAG is a better approach. Our current implementation uses the following scheme for stage recomputation:

1. If a read failure occurs for shuffleId #1 for an edge, we pick up a new shuffleId #2 for the same edge. 2. The upstream stage re-executes all tasks, but writes the output to shuffleId #2.
3. Tasks in the downstream stage re-try by reading from shuffleId #2.

From our experiment, whenever a Celeborn worker fails and a read failure
occurs for an edge, the re-execution of the upstream stage usally ends up with another read failure because some part of its input has also been lost. As a result, all upstream stages are eventually re-executed in a cascading manner. In essence, the failure of a Celeborn worker invalidates all existing shuffleIds.

(This is what we observed with Hive-MR3-Celeborn, but I guess stage recomputation in Spark will have to deal with the same problem.)

Thanks,

--- Sungwoo

Reply via email to