This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3593adf12 [CELEBORN-860][DOC] Document on ShuffleClient
3593adf12 is described below

commit 3593adf12d56d26d470a4d301dadb57150f9bd6a
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jul 31 20:07:20 2023 +0800

    [CELEBORN-860][DOC] Document on ShuffleClient
    
    ### What changes were proposed in this pull request?
    As title.
    
    ### Why are the changes needed?
    As title.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1778 from waitinfuture/860-1.
    
    Lead-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/client/ShuffleClient.java  | 28 ++++++-
 docs/developers/faulttolerant.md                   |  4 +-
 docs/developers/lifecyclemanager.md                |  2 +-
 docs/developers/{pushdata.md => shuffleclient.md}  | 86 +++++++++++++++++++---
 docs/developers/storage.md                         |  2 +-
 mkdocs.yml                                         |  2 +-
 6 files changed, 106 insertions(+), 18 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index c06c6a842..30f7af8fb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -99,7 +99,22 @@ public abstract class ShuffleClient {
 
   public abstract void setupLifecycleManagerRef(RpcEndpointRef endpointRef);
 
-  // Write data to a specific reduce partition
+  /**
+   * Write data to a specific reduce partition
+   *
+   * @param shuffleId the unique shuffle id of the application
+   * @param mapId the map id of the shuffle
+   * @param attemptId the attempt id of the map task, i.e. speculative task or 
task rerun for Apache
+   *     Spark
+   * @param partitionId the partition id the data belongs to
+   * @param data byte array containing data to be pushed
+   * @param offset start position of data to be pushed
+   * @param length length of data to be pushed
+   * @param numMappers the number map tasks in the shuffle
+   * @param numPartitions the number of partitions in the shuffle
+   * @return bytes pushed
+   * @throws IOException
+   */
   public abstract int pushData(
       int shuffleId,
       int mapId,
@@ -142,6 +157,17 @@ public abstract class ShuffleClient {
 
   // Reduce side read partition which is deduplicated by 
mapperId+mapperAttemptNum+batchId, batchId
   // is a self-incrementing variable hidden in the implementation when sending 
data.
+  /**
+   * @param shuffleId the unique shuffle id of the application
+   * @param partitionId the partition id to read from
+   * @param attemptNumber the attempt id of reduce task, can be safely set to 
any value
+   * @param startMapIndex the index of start map index of interested map 
range, set to 0 if you want
+   *     to read all partition data
+   * @param endMapIndex the index of end map index of interested map range, 
set to
+   *     `Integer.MAX_VALUE` if you want to read all partition data
+   * @return
+   * @throws IOException
+   */
   public abstract CelebornInputStream readPartition(
       int shuffleId, int partitionId, int attemptNumber, int startMapIndex, 
int endMapIndex)
       throws IOException;
diff --git a/docs/developers/faulttolerant.md b/docs/developers/faulttolerant.md
index a12ab6d92..09bd57220 100644
--- a/docs/developers/faulttolerant.md
+++ b/docs/developers/faulttolerant.md
@@ -29,7 +29,7 @@ as much as possible, especially the following:
 This article is based on 
[ReducePartition](../../developers/storage#reducepartition).
 
 ## Handle PushData Failure
-The detailed description of push data can be found in 
[PushData](../../developers/pushdata). Push data can fail for
+The detailed description of push data can be found in 
[PushData](../../developers/shuffleclient#pushdata). Push data can fail for
 various reasons, i.e. CPU high load, network fluctuation, JVM GC, `Worker` 
lost. 
 
 Celeborn does not eagerly consider `Worker` lost when push data fails, instead 
it considers it as temporary
@@ -38,7 +38,7 @@ The process is called `Revive`:
 
 ![Revive](../../assets/img/revive.svg)
 
-Handling [PushMergedData](../../developers/pushdata#push-or-merge) failure is 
similar but more complex. Currently,
+Handling [PushMergedData](../../developers/shuffleclient#push-or-merge) 
failure is similar but more complex. Currently,
 `PushMergedData` is in all-or-nothing fashion, meaning either all data batches 
in the request succeed or all fail.
 Partial success is not supported yet.
 
diff --git a/docs/developers/lifecyclemanager.md 
b/docs/developers/lifecyclemanager.md
index a495477cb..4549b4f07 100644
--- a/docs/developers/lifecyclemanager.md
+++ b/docs/developers/lifecyclemanager.md
@@ -45,7 +45,7 @@ to handle the requests, `LifecycleManager` will send requests 
to `Master` and `W
 - DestroyWorkerSlots to `Worker`
 
 ## RegisterShuffle
-As described in [PushData](../../developers/pushdata#lazy-shuffle-register), 
`ShuffleClient` lazily send
+As described in 
[PushData](../../developers/shuffleclient#lazy-shuffle-register), 
`ShuffleClient` lazily send
 RegisterShuffle to LifecycleManager, so many concurrent requests will be sent 
to `LifecycleManager`.
 
 To ensure only one request for each shuffle is handled, `LifecycleManager` 
puts tail requests in a set and only
diff --git a/docs/developers/pushdata.md b/docs/developers/shuffleclient.md
similarity index 57%
rename from docs/developers/pushdata.md
rename to docs/developers/shuffleclient.md
index d0df3b8e9..0ad9e8257 100644
--- a/docs/developers/pushdata.md
+++ b/docs/developers/shuffleclient.md
@@ -6,7 +6,9 @@ license: |
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,12 +16,15 @@ license: |
   limitations under the License.
 ---
 
+# ShuffleClient
 
-# Push Data
-
-This article describes the detailed design of the process of push data.
+## Overview
+ShuffleClient is responsible for pushing and reading shuffle data. It's a 
singleton in each leaf process,
+i.e. Executor in Apache Spark, or TaskManager in Apache Flink. This article 
describes the detailed design for
+push data and read data.
 
-## API specification
+## Push Data
+### API specification
 The push data API is as follows:
 ```java
   public abstract int pushData(
@@ -32,7 +37,6 @@ The push data API is as follows:
       int length,
       int numMappers,
       int numPartitions)
-      throws IOException;
 ```
 
 - `shuffleId` is the unique shuffle id of the application
@@ -43,7 +47,7 @@ The push data API is as follows:
 - `numMappers` is the number map tasks in the shuffle
 - `numPartitions` is the number of partitions in the shuffle
 
-## Lazy Shuffle Register
+### Lazy Shuffle Register
 The first time `pushData` is called, Client will check whether the shuffle id 
has been registered. If not,
 it sends `RegisterShuffle` to `LifecycleManager`, `LifecycleManager` then 
sends `RequestSlots` to `Master`.
 `RequestSlots` specifies how many `PartitionLocation`s this shuffle requires, 
each `PartitionLocation` logically
@@ -57,7 +61,7 @@ responds to `LifecycleManager` with the allocated 
`PartitionLocation`s.
 `LifcycleManager` caches the `PartitionLocation`s for the shuffle and responds 
to each `RegisterShuffle` RPCs from
 `ShuffleClient`s.
 
-## Normal Push
+### Normal Push
 In normal cases, the process of pushing data is as follows:
 
 - `ShuffleClient` compresses data, currently supports `zstd` and `lz4`
@@ -69,7 +73,7 @@ In normal cases, the process of pushing data is as follows:
   about how data is replicated and stored in `Worker`s, please refer to 
[Worker](../../developers/worker)
 - Upon receiving success ACK from `Worker`, `ShuffleClient` considers success 
for this pushing and modifies the push state
 
-## Push or Merge?
+### Push or Merge?
 If the size of data to be pushed is small, say hundreds of bytes, it will be 
very inefficient to send to the wire.
 So `ShuffleClient` offers another API: `mergeData` to batch data locally 
before sending to `Worker`.
 
@@ -80,7 +84,7 @@ primary and replica are the same. When the size of a 
`DataBatches` exceeds a thr
 Upon receiving `PushMergedData`, `Worker` unpacks it into data segments each 
for a specific `PartitionLocation`, then
 stores them accordingly.
 
-## Async Push
+### Async Push
 Celeborn's `ShuffleClient` does not block compute engine's execution by 
asynchronous pushing, implemented in
 `DataPusher`.
 
@@ -88,7 +92,7 @@ Whenever compute engine decides to push data, it calls 
`DataPusher#addTask`, `Da
 contains the data, and added the `PushTask` in a non-blocking queue. 
`DataPusher` continuously poll the queue
 and invokes `ShuffleClient#pushData` to do actual push.
 
-## Split
+### Split
 As mentioned before, Celeborn will split a `PartitionLocation` when any of the 
following conditions happens:
 
 - `PartitionLocation` file exceeds threshold (defaults to 1GiB)
@@ -111,4 +115,62 @@ The process of `SOFT_SPLIT` is as follows:
 ![softsplit](../../assets/img/softsplit.svg)
 
 `LifecycleManager` keeps the split information and tells reducer to read from 
all splits of the `PartitionLocation`
-to guarantee no data is lost.
\ No newline at end of file
+to guarantee no data is lost.
+
+## Read Data
+### API specification
+`ShuffleClient` provides an API that creates an InputStream to read data from 
a partition id. Users can also set
+`startMapIndex` and `endMapIndex` to read data within the map range.
+```java
+  public abstract CelebornInputStream readPartition(
+      int shuffleId,
+      int partitionId,
+      int attemptNumber,
+      int startMapIndex,
+      int endMapIndex)
+```
+
+- `shuffleId` is the unique shuffle id of the application
+- `partitionId` is the partition id to read from
+- `attemptNumber` is the attempt id of reduce task, can be safely set to any 
value
+- `startMapIndex` is the index of start map index of interested map range, set 
to 0 if you want to read all
+  partition data
+- `endMapIndex` is the index of end map index of interested map range, set to 
`Integer.MAX_VALUE` if you want
+  to read all partition data
+
+The returned input stream is guaranteed to be `Exactly Once`, meaning no data 
lost and no duplicated reading, or else
+an exception will be thrown, see 
[Here](../../developers/faulttolerant#exactly-once).
+
+### Get PartitionLocations
+To read data from a partition id, `ShuffleClient` first checks whether the 
mapping from partition id to all
+`PartitionLocation`s are locally cached, if not, `ShuffleClient` sends 
GetReducerFileGroup to `LifecycleManager`
+for the mapping, see 
[Here](../../developers/lifecyclemanager#getreducerfilegroup).
+
+### Read from PartitionLocation
+`ShuffleClient` creates a `PartitionReader` for each `PartitinLocation`.
+As described [Here](../../developers/storage#multi-layered-storage), 
`PartitionLocation` data can be stored in
+different medium, i.e. memory, local disk, distributed filesystem. For the 
former two, it creates
+a `WorkerPartitionReader` to read from `Worker`, for the last one, it creates 
a `DfsPartitionReader` to read
+directly from the distributed filesystem.
+
+As described [Here](../../developers/storage#reducepartition), the file is 
chunked. `WorkerPartitionReader` asynchronously
+requests multiple chunks from `Worker`, and reduce task consumes the data 
whenever available.
+
+If exception occurs when fetching a chunk, `ShuffleClient` will restart 
reading from the beginning of another
+(if replication is turned on, else retry the same) `PartitionLocation`. The 
reason to restart reading the whole
+`PartitionLocation` instead of the chunk is because chunks with the same index 
in primary and replica are not
+guaranteed to contain the same data, as explained 
[Here](../../developers/storage#reducepartition).
+
+`ShuffleClient` chained the `PartitionReader`s and wrap them in an 
InputStream. To avoid duplicated read,
+`CelebornInputStream` discards data from un-successful attempts, and records 
batch ids it has seen within an attempt.
+
+### Read from Map Range
+If user specifies `startMapIndex` and `endMapIndex`, `CelebornInputStream` 
will only return data within the range.
+Under the hood is that `Worker` only responds data within the range. This is 
achieved by sorting and indexing the file
+by map id upon receiving such range read request, then return the continuous 
data range of interest.
+
+Notice that the sort on read is only triggered upon map range read, not for 
the common cases where whole partition data
+is requested.
+
+Celeborn also optionally records map ids for each `PartitionLocation`, in the 
case of map range reading,
+`CelebornInputStream` will filter out `PartitionLocation`s that are out of the 
specified range.
\ No newline at end of file
diff --git a/docs/developers/storage.md b/docs/developers/storage.md
index e5915ae64..e27b7452a 100644
--- a/docs/developers/storage.md
+++ b/docs/developers/storage.md
@@ -95,7 +95,7 @@ Celeborn supports two configurable kinds of split:
 - `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept 
data, when new epoch is ready, `ShuffleClient`
   switches to the new location transparently
 
-The detailed design of split can be found 
[Here](../../developers/pushdata#split).
+The detailed design of split can be found 
[Here](../../developers/shuffleclient#split).
 
 ## Self Check
 In additional to health and space check on each disk, `Worker` also collects 
perf statistics to feed Master for
diff --git a/mkdocs.yml b/mkdocs.yml
index d523d1c65..ec14dee15 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -91,7 +91,7 @@ nav:
       - Client:
         - Overview: developers/client.md
         - LifecycleManager: developers/lifecyclemanager.md
-      - PushData: developers/pushdata.md
+        - ShuffleClient: developers/shuffleclient.md
       - Fault Tolerant: developers/faulttolerant.md
 #      - ReadData: developers/readdata.md
 #      - Slots Allocation: developers/slotsallocation.md

Reply via email to