This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e849645e [CELEBORN-824][DOC] Add PushData document
8e849645e is described below

commit 8e849645eb3de0f6b889a14b45b618ad7a1de33c
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jul 24 10:38:46 2023 +0800

    [CELEBORN-824][DOC] Add PushData document
    
    ### What changes were proposed in this pull request?
    As title.
    
    ### Why are the changes needed?
    As title
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    No.
    
    Closes #1747 from waitinfuture/824.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 docs/README.md                |   2 +-
 docs/assets/img/softsplit.svg |   4 ++
 docs/developers/overview.md   |   8 +--
 docs/developers/pushdata.md   | 113 ++++++++++++++++++++++++++++++++++++++++++
 mkdocs.yml                    |   3 +-
 5 files changed, 124 insertions(+), 6 deletions(-)

diff --git a/docs/README.md b/docs/README.md
index 4e7810017..b71e59ac7 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -96,7 +96,7 @@ cd $SPARK_HOME
 --conf spark.shuffle.service.enabled=false
 ```
 Then run the following test case:
-```shell
+```scala
 spark.sparkContext.parallelize(1 to 10, 10)
   .flatMap( _ => (1 to 100).iterator
   .map(num => num)).repartition(10).count
diff --git a/docs/assets/img/softsplit.svg b/docs/assets/img/softsplit.svg
new file mode 100644
index 000000000..a0800e08c
--- /dev/null
+++ b/docs/assets/img/softsplit.svg
@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Do not edit this file with editors other than diagrams.net -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" 
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd";>
+<svg xmlns="http://www.w3.org/2000/svg"; 
xmlns:xlink="http://www.w3.org/1999/xlink"; version="1.1" width="606px" 
height="388px" viewBox="-0.5 -0.5 606 388" content="&lt;mxfile 
host=&quot;app.diagrams.net&quot; modified=&quot;2023-07-24T02:18:46.040Z&quot; 
agent=&quot;5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, 
like Gecko) Chrome/114.0.0.0 Safari/537.36&quot; 
etag=&quot;iNp8mduIIqRe4A2Sv3kt&quot; version=&quot;16.0.0&quot; 
type=&quot;device&quot;&gt;&lt;diagram id=&qu [...]
\ No newline at end of file
diff --git a/docs/developers/overview.md b/docs/developers/overview.md
index 7c49f449f..832c502c7 100644
--- a/docs/developers/overview.md
+++ b/docs/developers/overview.md
@@ -23,7 +23,7 @@ please refer to dedicated articles.
 In distributed compute engines, data exchange between compute nodes is common 
but expensive. The cost comes from
 the disk and network inefficiency (M * N between Mappers and Reducers) in 
traditional shuffle frame, as following:
 
-![ESS](/assets/img/ess.svg)
+![ESS](../../assets/img/ess.svg)
 
 Besides inefficiency, traditional shuffle framework requires large local 
storage in compute node to store shuffle
 data, thus blocks the adoption of disaggregated architecture.
@@ -31,7 +31,7 @@ data, thus blocks the adoption of disaggregated architecture.
 Apache Celeborn(Incubating) solves the problems by reorganizing shuffle data 
in a more efficient way, and storing the data in
 a separate service. The high level architecture of Celeborn is as follows:
 
-![Celeborn](/assets/img/celeborn.svg)
+![Celeborn](../../assets/img/celeborn.svg)
 
 ## Components
 Celeborn(Incubating) has three primary components: Master, Worker, and Client.
@@ -73,8 +73,8 @@ it just needs one network connection and sequentially read 
the coarse grained fi
 In abnormal cases, such as when the file grows too large, or push data fails, 
Celeborn spawns a new split of the
 `PartitionLocation`, and future data within the partition will be pushed to 
the new split.
 
-Client keeps the split information and tells reducer to read from all splits 
of the `PartitionLocation` to guarantee
-no data is lost.
+`LifecycleManager` keeps the split information and tells reducer to read from 
all splits of the `PartitionLocation`
+to guarantee no data is lost.
 
 ## Data Storage
 Celeborn stores shuffle data in configurable multiple layers, i.e. `Memroy`, 
`Local Disks`, `Distributed File System`,
diff --git a/docs/developers/pushdata.md b/docs/developers/pushdata.md
new file mode 100644
index 000000000..07bdae1c3
--- /dev/null
+++ b/docs/developers/pushdata.md
@@ -0,0 +1,113 @@
+---
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+
+# Push Data
+
+This article describes the detailed design of the process of push data.
+
+## API specification
+The push data API is as follows:
+```java
+  public abstract int pushData(
+      int shuffleId,
+      int mapId,
+      int attemptId,
+      int partitionId,
+      byte[] data,
+      int offset,
+      int length,
+      int numMappers,
+      int numPartitions)
+      throws IOException;
+```
+
+- `shuffleId` is the unique shuffle id of the application
+- `mapId` is the map id of the shuffle
+- `attemptId` is the attempt id of the map task, i.e. speculative task or task 
rerun for Apache Spark
+- `partitionId` is the partition id the data belongs to
+- `data`,`offset`,`length` specifies the bytes to be pushed
+- `numMappers` is the number map tasks in the shuffle
+- `numPartitions` is the number of partitions in the shuffle
+
+## Lazy Shuffle Register
+The first time `pushData` is called, Client will check whether the shuffle id 
has been registered. If not,
+it sends `RegisterShuffle` to `LifecycleManager`, `LifecycleManager` then 
sends `RequestSlots` to `Master`.
+`RequestSlots` specifies how many `PartitionLocation`s this shuffle requires, 
each `PartitionLocation` logically
+responds to data of some partition id.
+
+Upon receiving `RequestSlots`, `Master` allocates slots for the shuffle among 
`Worker`s. If replication is turned on,
+`Master` allocates a pair of `Worker`s for each `PartitionLocation` to store 
two replicas for each `PartitionLocation`.
+The detailed allocation strategy can be found in [Slots 
Allocation](../../developers/slotsallocation). `Master` then
+responds to `LifecycleManager` with the allocated `PartitionLocation`s.
+
+`LifcycleManager` caches the `PartitionLocation`s for the shuffle and responds 
to each `RegisterShuffle` RPCs from
+`ShuffleClient`s.
+
+## Normal Push
+In normal cases, the process of pushing data is as follows:
+
+- `ShuffleClient` compresses data, currently supports `zstd` and `lz4`
+- `ShuffleClient` addes Header for the data: `mapId`, `attemptId`, `batchId` 
and `size`. The `bastchId` is a unique
+  id for the data batch inside the (`mapId`, `attemptId`), for the purpose of 
de-duplication
+- `ShuffleClient` sends `PushData` to the `Worker` on which the current 
`PartitionLocation` is allocated, and holds push
+  state for this pushing
+- `Worker` receives the data, do replication if needed, then responds success 
ACK to `ShuffleClient`. For more details
+  about how data is replicated and stored in `Worker`s, please refer to 
[Worker](../../developers/worker)
+- Upon receiving success ACK from `Worker`, `ShuffleClient` considers success 
for this pushing and modifies the push state
+
+## Push or Merge?
+If the size of data to be pushed is small, say hundreds of bytes, it will be 
very inefficient to send to the wire.
+So `ShuffleClient` offers another API: `mergeData` to batch data locally 
before sending to `Worker`.
+
+`mergeData` merges data with the same target into `DataBatches`. `Same target` 
means the destination for both the
+primary and replica are the same. When the size of a `DataBatches` exceeds a 
threshold (defaults to`64KiB`),
+`ShuffleClient` triggers pushing and sends `PushMergedData` to the destination.
+
+Upon receiving `PushMergedData`, `Worker` unpacks it into data segments each 
for a specific `PartitionLocation`, then
+stores them accordingly.
+
+## Async Push
+Celeborn's `ShuffleClient` does not block compute engine's execution by 
asynchronous pushing, implemented in
+`DataPusher`.
+
+Whenever compute engine decides to push data, it calls `DataPusher#addTask`, 
`DataPusher` creates a `PushTask` which
+contains the data, and added the `PushTask` in a non-blocking queue. 
`DataPusher` continuously poll the queue
+and invokes `ShuffleClient#pushData` to do actual push.
+
+## Split
+As mentioned before, Celeborn will split a `PartitionLocation` when any of the 
following conditions happens:
+
+- `PartitionLocation` file exceeds threshold (defaults to 1GiB)
+- Usable space of local disk is less than threshold (defaults to 5GiB)
+- `Worker` is in `Graceful Shutdown` state
+- Push data fails
+
+For the first three cases, `Worker` informs `ShuffleClient` that it should 
trigger split; for the last case,
+`ShuffleClient` triggers split itself.
+
+There are two kinds of Split:
+- `HARD_SPLIT`, meaning old `PartitionLocation` epoch refuses to accept any 
data, and future data of the
+  `PartitionLocation` will only be pushed after new `PartitionLocation` epoch 
is ready
+- `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept 
data, when new epoch is ready, `ShuffleClient`
+  switches to the new location transparently
+
+The process of `SOFT_SPLIT` is as follows:
+
+![softsplit](../../assets/img/softsplit.svg)
+
+`LifecycleManager` keeps the split information and tells reducer to read from 
all splits of the `PartitionLocation`
+to guarantee no data is lost.
\ No newline at end of file
diff --git a/mkdocs.yml b/mkdocs.yml
index 8cfd67370..007cecb9c 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -72,12 +72,13 @@ copyright: >
   <br>
 
 nav:
-  - Configuration: configuration/index.md
+  - QuickStart: README.md
   - Deployment:
       - Overview: deploy.md
       - Kubernetes: deploy_on_k8s.md
       - Upgrade: upgrade.md
       - Ratis Shell: celeborn_ratis_shell.md
+  - Configuration: configuration/index.md
   - Monitoring: monitoring.md
   - Migration Guide: migration.md
   - Developers Doc:

Reply via email to