Hi Sungwoo,

Thanks for your mail. For your questions:

1. ShuffleClient.readPartition() may not read chunks in the same order that
they are
created.
2. There is some way to preserve order of batches, at the cost of
decreasing stability.
However, you can increasing the push buffer size to work around the problem.

To be more detail, Celeborn does not guarantee batch order. The reason
is as follows.

First, the default number of netty connections between a ShuffleClient and
a Celeborn
Worker is 2, so although ShuffleClient sends batch1, batch2 in order, the
arrival order to
Worker can be batch2, batch1, because they might be sent through different
connections.
Even if they arrive in the order of batch1, batch2, since they are in
different connections,
they are handled by different netty threads in Worker, so the order they
are written to file
is random. Of course, we can configure the number of connections to 1 by
changing celeborn.<module>.io.numConnectionsPerPeer, but we still can't
guarantee order because:

Second, when PushData fails, ShuffleClient will asynchronously resend the
data to a new chosen
Worker (pair). Say ShuffleClient first sends batch1, batch2, and the two
requests both fail,
then ShuffleClient will resend the two batches asynchronously in random
order, and possibly
to different workers, so it can't guarantee their order.

Third, as you said, multiple chunks are fetched at once. But like before,
if we configure the
number of connections to 1, the order of receiving chunks can be ensured in
case no
exception happens.

The situation becomes more complicated if replication is enabled.

Based on the above discussion, I think one way to preserve order can be
achieved by :
1. disable data replication: celeborn.client.push.replicate.enabled=false
2. set number of connections per peer to be one:
celeborn.data.io.numConnectionsPerPeer=1
3. set max retries when push data fail to be
one: celeborn.client.push.revive.maxRetries=1
4. set max retries when fetch data fail to be one:
celeborn.client.fetch.maxRetriesForEachReplica=1
But setting these configs will decrease the stability.

Another alternative is to use a relatively large push buffer size, i.e. 16m:
celeborn.client.push.buffer.max.size=16m.
Data inside a batch data is ordered, we can merge all batches in reduce
tasks. Since
the batch size is relatively large, the cost of merging is fine.

Thanks,
Keyong Zhou


<[email protected]> 于2023年8月14日周一 15:09写道:

> Hi Celeborn team,
>
> We are implementing a Celeborn-MR3 client, and have a question on the
> order of chunks returned by ShuffleClient.readPartition().
>
> --- Setup
>
> With shuffleId, mapId, attemptId, partitionId all fixed, suppose that a
> mapper with mapIndex M calls ShuffleClient.pushData() several times in the
> following order:
>
>    ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
> batch1, ...)
>    ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
> batch2, ...)
>    ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
> batch3, ...)
>    ...
>    ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
> batchN, ...)
>
> Then a reducer calls ShuffleClient.readPartition() as follows:
>
>    ShuffleClient.readPartition(shuffleId, partitionId, attemptId, M, M + 1)
>
> --- Expectation and result
>
> ShuffleClient.readPartition() should read batches in the same order that
> they are written by calls to pushData(). That is, we expect
> ShuffleClient.readPartition() to return:
>
>    batch1, batch2, batch3, ..., batchN
>
> However, the same order is not always guaranteed. For example,
> ShuffleClient.readPartition() may return:
>
>    batch3, batch1, batch2, ..., batchN
>
> We find that each batch is written to a Celeborn chunk in its entirely,
> but ShuffleClient.readPartition() does not necessarily read Celeborn
> chunks in the same order that they are created.
>
> --- Problem
>
> For unordered data, the order of batches (or chunks) returned by
> ShuffleClient.readPartition() does not matter.
>
> For ordered data, however, the same order should be enforced because a
> mapper sorts output data before sending it to Celeborn in several batches.
> This is a requirement specific to Tez runtime. (I guess Spark does not
> depend on the order of batches because a reducer sorts all records.)
>
> --- Quick fix
>
> We can set celeborn.shuffle.chunk.size to a large value so that a single
> chunk can accommodate all batches. However, this is not a general solution
> because the max size of mapper output is unknown.
>
> --- Questions
>
> 1. Is it a normal behavior of Celeborn that ShuffleClient.readPartition()
> may not read chunks in the same odder that there are created?
>
> We are confused because TransportClient.java says:
>
>     * <p>Multiple fetchChunk requests may be outstanding simultaneously,
> and the chunks are
>     * guaranteed to be returned in the same order that they were
> requested,
> assuming only a single
>     * TransportClient is used to fetch the chunks.
>
> However, the implementation in the class WorkerPartitionReader seems to
> suggest otherwise because it does not check or sort chunk indexes. (It
> seems like up to celeborn.client.fetch.maxReqsInFlight chucnks are
> requested at once and whichever chunk arrives first is returned right
> away.)
>
> 2. If this is a normal behavior of ShuffleClient.readPartition(), is there
> some way to preserve the order of batches when calling
> ShuffleClient.readPartition()?
>
> Thanks,
>
> --- Sungwoo
>
>

Reply via email to