Hi Keyong,

Thank you for your detailed response. We will think about how to pass ordered data.

Thanks,

--- Sungwoo

On Tue, 15 Aug 2023, Keyong Zhou wrote:

Hi Sungwoo,

Thanks for your mail. For your questions:

1. ShuffleClient.readPartition() may not read chunks in the same order that
they are
created.
2. There is some way to preserve order of batches, at the cost of
decreasing stability.
However, you can increasing the push buffer size to work around the problem.

To be more detail, Celeborn does not guarantee batch order. The reason
is as follows.

First, the default number of netty connections between a ShuffleClient and
a Celeborn
Worker is 2, so although ShuffleClient sends batch1, batch2 in order, the
arrival order to
Worker can be batch2, batch1, because they might be sent through different
connections.
Even if they arrive in the order of batch1, batch2, since they are in
different connections,
they are handled by different netty threads in Worker, so the order they
are written to file
is random. Of course, we can configure the number of connections to 1 by
changing celeborn.<module>.io.numConnectionsPerPeer, but we still can't
guarantee order because:

Second, when PushData fails, ShuffleClient will asynchronously resend the
data to a new chosen
Worker (pair). Say ShuffleClient first sends batch1, batch2, and the two
requests both fail,
then ShuffleClient will resend the two batches asynchronously in random
order, and possibly
to different workers, so it can't guarantee their order.

Third, as you said, multiple chunks are fetched at once. But like before,
if we configure the
number of connections to 1, the order of receiving chunks can be ensured in
case no
exception happens.

The situation becomes more complicated if replication is enabled.

Based on the above discussion, I think one way to preserve order can be
achieved by :
1. disable data replication: celeborn.client.push.replicate.enabled=false
2. set number of connections per peer to be one:
celeborn.data.io.numConnectionsPerPeer=1
3. set max retries when push data fail to be
one: celeborn.client.push.revive.maxRetries=1
4. set max retries when fetch data fail to be one:
celeborn.client.fetch.maxRetriesForEachReplica=1
But setting these configs will decrease the stability.

Another alternative is to use a relatively large push buffer size, i.e. 16m:
celeborn.client.push.buffer.max.size=16m.
Data inside a batch data is ordered, we can merge all batches in reduce
tasks. Since
the batch size is relatively large, the cost of merging is fine.

Thanks,
Keyong Zhou


<[email protected]> 于2023年8月14日周一 15:09?道:

Hi Celeborn team,

We are implementing a Celeborn-MR3 client, and have a question on the
order of chunks returned by ShuffleClient.readPartition().

--- Setup

With shuffleId, mapId, attemptId, partitionId all fixed, suppose that a
mapper with mapIndex M calls ShuffleClient.pushData() several times in the
following order:

   ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
batch1, ...)
   ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
batch2, ...)
   ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
batch3, ...)
   ...
   ShuffleClient.pushData(shuffleId, mapId, attemptId, partitionId,
batchN, ...)

Then a reducer calls ShuffleClient.readPartition() as follows:

   ShuffleClient.readPartition(shuffleId, partitionId, attemptId, M, M + 1)

--- Expectation and result

ShuffleClient.readPartition() should read batches in the same order that
they are written by calls to pushData(). That is, we expect
ShuffleClient.readPartition() to return:

   batch1, batch2, batch3, ..., batchN

However, the same order is not always guaranteed. For example,
ShuffleClient.readPartition() may return:

   batch3, batch1, batch2, ..., batchN

We find that each batch is written to a Celeborn chunk in its entirely,
but ShuffleClient.readPartition() does not necessarily read Celeborn
chunks in the same order that they are created.

--- Problem

For unordered data, the order of batches (or chunks) returned by
ShuffleClient.readPartition() does not matter.

For ordered data, however, the same order should be enforced because a
mapper sorts output data before sending it to Celeborn in several batches.
This is a requirement specific to Tez runtime. (I guess Spark does not
depend on the order of batches because a reducer sorts all records.)

--- Quick fix

We can set celeborn.shuffle.chunk.size to a large value so that a single
chunk can accommodate all batches. However, this is not a general solution
because the max size of mapper output is unknown.

--- Questions

1. Is it a normal behavior of Celeborn that ShuffleClient.readPartition()
may not read chunks in the same odder that there are created?

We are confused because TransportClient.java says:

    * <p>Multiple fetchChunk requests may be outstanding simultaneously,
and the chunks are
    * guaranteed to be returned in the same order that they were
requested,
assuming only a single
    * TransportClient is used to fetch the chunks.

However, the implementation in the class WorkerPartitionReader seems to
suggest otherwise because it does not check or sort chunk indexes. (It
seems like up to celeborn.client.fetch.maxReqsInFlight chucnks are
requested at once and whichever chunk arrives first is returned right
away.)

2. If this is a normal behavior of ShuffleClient.readPartition(), is there
some way to preserve the order of batches when calling
ShuffleClient.readPartition()?

Thanks,

--- Sungwoo


Reply via email to