This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3593adf12 [CELEBORN-860][DOC] Document on ShuffleClient
3593adf12 is described below
commit 3593adf12d56d26d470a4d301dadb57150f9bd6a
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jul 31 20:07:20 2023 +0800
[CELEBORN-860][DOC] Document on ShuffleClient
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
As title.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1778 from waitinfuture/860-1.
Lead-authored-by: zky.zhoukeyong <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/ShuffleClient.java | 28 ++++++-
docs/developers/faulttolerant.md | 4 +-
docs/developers/lifecyclemanager.md | 2 +-
docs/developers/{pushdata.md => shuffleclient.md} | 86 +++++++++++++++++++---
docs/developers/storage.md | 2 +-
mkdocs.yml | 2 +-
6 files changed, 106 insertions(+), 18 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index c06c6a842..30f7af8fb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -99,7 +99,22 @@ public abstract class ShuffleClient {
public abstract void setupLifecycleManagerRef(RpcEndpointRef endpointRef);
- // Write data to a specific reduce partition
+ /**
+ * Write data to a specific reduce partition
+ *
+ * @param shuffleId the unique shuffle id of the application
+ * @param mapId the map id of the shuffle
+ * @param attemptId the attempt id of the map task, i.e. speculative task or
task rerun for Apache
+ * Spark
+ * @param partitionId the partition id the data belongs to
+ * @param data byte array containing data to be pushed
+ * @param offset start position of data to be pushed
+ * @param length length of data to be pushed
+ * @param numMappers the number map tasks in the shuffle
+ * @param numPartitions the number of partitions in the shuffle
+ * @return bytes pushed
+ * @throws IOException
+ */
public abstract int pushData(
int shuffleId,
int mapId,
@@ -142,6 +157,17 @@ public abstract class ShuffleClient {
// Reduce side read partition which is deduplicated by
mapperId+mapperAttemptNum+batchId, batchId
// is a self-incrementing variable hidden in the implementation when sending
data.
+ /**
+ * @param shuffleId the unique shuffle id of the application
+ * @param partitionId the partition id to read from
+ * @param attemptNumber the attempt id of reduce task, can be safely set to
any value
+ * @param startMapIndex the index of start map index of interested map
range, set to 0 if you want
+ * to read all partition data
+ * @param endMapIndex the index of end map index of interested map range,
set to
+ * `Integer.MAX_VALUE` if you want to read all partition data
+ * @return
+ * @throws IOException
+ */
public abstract CelebornInputStream readPartition(
int shuffleId, int partitionId, int attemptNumber, int startMapIndex,
int endMapIndex)
throws IOException;
diff --git a/docs/developers/faulttolerant.md b/docs/developers/faulttolerant.md
index a12ab6d92..09bd57220 100644
--- a/docs/developers/faulttolerant.md
+++ b/docs/developers/faulttolerant.md
@@ -29,7 +29,7 @@ as much as possible, especially the following:
This article is based on
[ReducePartition](../../developers/storage#reducepartition).
## Handle PushData Failure
-The detailed description of push data can be found in
[PushData](../../developers/pushdata). Push data can fail for
+The detailed description of push data can be found in
[PushData](../../developers/shuffleclient#pushdata). Push data can fail for
various reasons, i.e. CPU high load, network fluctuation, JVM GC, `Worker`
lost.
Celeborn does not eagerly consider `Worker` lost when push data fails, instead
it considers it as temporary
@@ -38,7 +38,7 @@ The process is called `Revive`:

-Handling [PushMergedData](../../developers/pushdata#push-or-merge) failure is
similar but more complex. Currently,
+Handling [PushMergedData](../../developers/shuffleclient#push-or-merge)
failure is similar but more complex. Currently,
`PushMergedData` is in all-or-nothing fashion, meaning either all data batches
in the request succeed or all fail.
Partial success is not supported yet.
diff --git a/docs/developers/lifecyclemanager.md
b/docs/developers/lifecyclemanager.md
index a495477cb..4549b4f07 100644
--- a/docs/developers/lifecyclemanager.md
+++ b/docs/developers/lifecyclemanager.md
@@ -45,7 +45,7 @@ to handle the requests, `LifecycleManager` will send requests
to `Master` and `W
- DestroyWorkerSlots to `Worker`
## RegisterShuffle
-As described in [PushData](../../developers/pushdata#lazy-shuffle-register),
`ShuffleClient` lazily send
+As described in
[PushData](../../developers/shuffleclient#lazy-shuffle-register),
`ShuffleClient` lazily send
RegisterShuffle to LifecycleManager, so many concurrent requests will be sent
to `LifecycleManager`.
To ensure only one request for each shuffle is handled, `LifecycleManager`
puts tail requests in a set and only
diff --git a/docs/developers/pushdata.md b/docs/developers/shuffleclient.md
similarity index 57%
rename from docs/developers/pushdata.md
rename to docs/developers/shuffleclient.md
index d0df3b8e9..0ad9e8257 100644
--- a/docs/developers/pushdata.md
+++ b/docs/developers/shuffleclient.md
@@ -6,7 +6,9 @@ license: |
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,12 +16,15 @@ license: |
limitations under the License.
---
+# ShuffleClient
-# Push Data
-
-This article describes the detailed design of the process of push data.
+## Overview
+ShuffleClient is responsible for pushing and reading shuffle data. It's a
singleton in each leaf process,
+i.e. Executor in Apache Spark, or TaskManager in Apache Flink. This article
describes the detailed design for
+push data and read data.
-## API specification
+## Push Data
+### API specification
The push data API is as follows:
```java
public abstract int pushData(
@@ -32,7 +37,6 @@ The push data API is as follows:
int length,
int numMappers,
int numPartitions)
- throws IOException;
```
- `shuffleId` is the unique shuffle id of the application
@@ -43,7 +47,7 @@ The push data API is as follows:
- `numMappers` is the number map tasks in the shuffle
- `numPartitions` is the number of partitions in the shuffle
-## Lazy Shuffle Register
+### Lazy Shuffle Register
The first time `pushData` is called, Client will check whether the shuffle id
has been registered. If not,
it sends `RegisterShuffle` to `LifecycleManager`, `LifecycleManager` then
sends `RequestSlots` to `Master`.
`RequestSlots` specifies how many `PartitionLocation`s this shuffle requires,
each `PartitionLocation` logically
@@ -57,7 +61,7 @@ responds to `LifecycleManager` with the allocated
`PartitionLocation`s.
`LifcycleManager` caches the `PartitionLocation`s for the shuffle and responds
to each `RegisterShuffle` RPCs from
`ShuffleClient`s.
-## Normal Push
+### Normal Push
In normal cases, the process of pushing data is as follows:
- `ShuffleClient` compresses data, currently supports `zstd` and `lz4`
@@ -69,7 +73,7 @@ In normal cases, the process of pushing data is as follows:
about how data is replicated and stored in `Worker`s, please refer to
[Worker](../../developers/worker)
- Upon receiving success ACK from `Worker`, `ShuffleClient` considers success
for this pushing and modifies the push state
-## Push or Merge?
+### Push or Merge?
If the size of data to be pushed is small, say hundreds of bytes, it will be
very inefficient to send to the wire.
So `ShuffleClient` offers another API: `mergeData` to batch data locally
before sending to `Worker`.
@@ -80,7 +84,7 @@ primary and replica are the same. When the size of a
`DataBatches` exceeds a thr
Upon receiving `PushMergedData`, `Worker` unpacks it into data segments each
for a specific `PartitionLocation`, then
stores them accordingly.
-## Async Push
+### Async Push
Celeborn's `ShuffleClient` does not block compute engine's execution by
asynchronous pushing, implemented in
`DataPusher`.
@@ -88,7 +92,7 @@ Whenever compute engine decides to push data, it calls
`DataPusher#addTask`, `Da
contains the data, and added the `PushTask` in a non-blocking queue.
`DataPusher` continuously poll the queue
and invokes `ShuffleClient#pushData` to do actual push.
-## Split
+### Split
As mentioned before, Celeborn will split a `PartitionLocation` when any of the
following conditions happens:
- `PartitionLocation` file exceeds threshold (defaults to 1GiB)
@@ -111,4 +115,62 @@ The process of `SOFT_SPLIT` is as follows:

`LifecycleManager` keeps the split information and tells reducer to read from
all splits of the `PartitionLocation`
-to guarantee no data is lost.
\ No newline at end of file
+to guarantee no data is lost.
+
+## Read Data
+### API specification
+`ShuffleClient` provides an API that creates an InputStream to read data from
a partition id. Users can also set
+`startMapIndex` and `endMapIndex` to read data within the map range.
+```java
+ public abstract CelebornInputStream readPartition(
+ int shuffleId,
+ int partitionId,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex)
+```
+
+- `shuffleId` is the unique shuffle id of the application
+- `partitionId` is the partition id to read from
+- `attemptNumber` is the attempt id of reduce task, can be safely set to any
value
+- `startMapIndex` is the index of start map index of interested map range, set
to 0 if you want to read all
+ partition data
+- `endMapIndex` is the index of end map index of interested map range, set to
`Integer.MAX_VALUE` if you want
+ to read all partition data
+
+The returned input stream is guaranteed to be `Exactly Once`, meaning no data
lost and no duplicated reading, or else
+an exception will be thrown, see
[Here](../../developers/faulttolerant#exactly-once).
+
+### Get PartitionLocations
+To read data from a partition id, `ShuffleClient` first checks whether the
mapping from partition id to all
+`PartitionLocation`s are locally cached, if not, `ShuffleClient` sends
GetReducerFileGroup to `LifecycleManager`
+for the mapping, see
[Here](../../developers/lifecyclemanager#getreducerfilegroup).
+
+### Read from PartitionLocation
+`ShuffleClient` creates a `PartitionReader` for each `PartitinLocation`.
+As described [Here](../../developers/storage#multi-layered-storage),
`PartitionLocation` data can be stored in
+different medium, i.e. memory, local disk, distributed filesystem. For the
former two, it creates
+a `WorkerPartitionReader` to read from `Worker`, for the last one, it creates
a `DfsPartitionReader` to read
+directly from the distributed filesystem.
+
+As described [Here](../../developers/storage#reducepartition), the file is
chunked. `WorkerPartitionReader` asynchronously
+requests multiple chunks from `Worker`, and reduce task consumes the data
whenever available.
+
+If exception occurs when fetching a chunk, `ShuffleClient` will restart
reading from the beginning of another
+(if replication is turned on, else retry the same) `PartitionLocation`. The
reason to restart reading the whole
+`PartitionLocation` instead of the chunk is because chunks with the same index
in primary and replica are not
+guaranteed to contain the same data, as explained
[Here](../../developers/storage#reducepartition).
+
+`ShuffleClient` chained the `PartitionReader`s and wrap them in an
InputStream. To avoid duplicated read,
+`CelebornInputStream` discards data from un-successful attempts, and records
batch ids it has seen within an attempt.
+
+### Read from Map Range
+If user specifies `startMapIndex` and `endMapIndex`, `CelebornInputStream`
will only return data within the range.
+Under the hood is that `Worker` only responds data within the range. This is
achieved by sorting and indexing the file
+by map id upon receiving such range read request, then return the continuous
data range of interest.
+
+Notice that the sort on read is only triggered upon map range read, not for
the common cases where whole partition data
+is requested.
+
+Celeborn also optionally records map ids for each `PartitionLocation`, in the
case of map range reading,
+`CelebornInputStream` will filter out `PartitionLocation`s that are out of the
specified range.
\ No newline at end of file
diff --git a/docs/developers/storage.md b/docs/developers/storage.md
index e5915ae64..e27b7452a 100644
--- a/docs/developers/storage.md
+++ b/docs/developers/storage.md
@@ -95,7 +95,7 @@ Celeborn supports two configurable kinds of split:
- `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept
data, when new epoch is ready, `ShuffleClient`
switches to the new location transparently
-The detailed design of split can be found
[Here](../../developers/pushdata#split).
+The detailed design of split can be found
[Here](../../developers/shuffleclient#split).
## Self Check
In additional to health and space check on each disk, `Worker` also collects
perf statistics to feed Master for
diff --git a/mkdocs.yml b/mkdocs.yml
index d523d1c65..ec14dee15 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -91,7 +91,7 @@ nav:
- Client:
- Overview: developers/client.md
- LifecycleManager: developers/lifecyclemanager.md
- - PushData: developers/pushdata.md
+ - ShuffleClient: developers/shuffleclient.md
- Fault Tolerant: developers/faulttolerant.md
# - ReadData: developers/readdata.md
# - Slots Allocation: developers/slotsallocation.md