This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8e849645e [CELEBORN-824][DOC] Add PushData document
8e849645e is described below
commit 8e849645eb3de0f6b889a14b45b618ad7a1de33c
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jul 24 10:38:46 2023 +0800
[CELEBORN-824][DOC] Add PushData document
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes #1747 from waitinfuture/824.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
docs/README.md | 2 +-
docs/assets/img/softsplit.svg | 4 ++
docs/developers/overview.md | 8 +--
docs/developers/pushdata.md | 113 ++++++++++++++++++++++++++++++++++++++++++
mkdocs.yml | 3 +-
5 files changed, 124 insertions(+), 6 deletions(-)
diff --git a/docs/README.md b/docs/README.md
index 4e7810017..b71e59ac7 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -96,7 +96,7 @@ cd $SPARK_HOME
--conf spark.shuffle.service.enabled=false
```
Then run the following test case:
-```shell
+```scala
spark.sparkContext.parallelize(1 to 10, 10)
.flatMap( _ => (1 to 100).iterator
.map(num => num)).repartition(10).count
diff --git a/docs/assets/img/softsplit.svg b/docs/assets/img/softsplit.svg
new file mode 100644
index 000000000..a0800e08c
--- /dev/null
+++ b/docs/assets/img/softsplit.svg
@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Do not edit this file with editors other than diagrams.net -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg"
xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="606px"
height="388px" viewBox="-0.5 -0.5 606 388" content="<mxfile
host="app.diagrams.net" modified="2023-07-24T02:18:46.040Z"
agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
like Gecko) Chrome/114.0.0.0 Safari/537.36"
etag="iNp8mduIIqRe4A2Sv3kt" version="16.0.0"
type="device"><diagram id=&qu [...]
\ No newline at end of file
diff --git a/docs/developers/overview.md b/docs/developers/overview.md
index 7c49f449f..832c502c7 100644
--- a/docs/developers/overview.md
+++ b/docs/developers/overview.md
@@ -23,7 +23,7 @@ please refer to dedicated articles.
In distributed compute engines, data exchange between compute nodes is common
but expensive. The cost comes from
the disk and network inefficiency (M * N between Mappers and Reducers) in
traditional shuffle frame, as following:
-
+
Besides inefficiency, traditional shuffle framework requires large local
storage in compute node to store shuffle
data, thus blocks the adoption of disaggregated architecture.
@@ -31,7 +31,7 @@ data, thus blocks the adoption of disaggregated architecture.
Apache Celeborn(Incubating) solves the problems by reorganizing shuffle data
in a more efficient way, and storing the data in
a separate service. The high level architecture of Celeborn is as follows:
-
+
## Components
Celeborn(Incubating) has three primary components: Master, Worker, and Client.
@@ -73,8 +73,8 @@ it just needs one network connection and sequentially read
the coarse grained fi
In abnormal cases, such as when the file grows too large, or push data fails,
Celeborn spawns a new split of the
`PartitionLocation`, and future data within the partition will be pushed to
the new split.
-Client keeps the split information and tells reducer to read from all splits
of the `PartitionLocation` to guarantee
-no data is lost.
+`LifecycleManager` keeps the split information and tells reducer to read from
all splits of the `PartitionLocation`
+to guarantee no data is lost.
## Data Storage
Celeborn stores shuffle data in configurable multiple layers, i.e. `Memroy`,
`Local Disks`, `Distributed File System`,
diff --git a/docs/developers/pushdata.md b/docs/developers/pushdata.md
new file mode 100644
index 000000000..07bdae1c3
--- /dev/null
+++ b/docs/developers/pushdata.md
@@ -0,0 +1,113 @@
+---
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+
+# Push Data
+
+This article describes the detailed design of the process of push data.
+
+## API specification
+The push data API is as follows:
+```java
+ public abstract int pushData(
+ int shuffleId,
+ int mapId,
+ int attemptId,
+ int partitionId,
+ byte[] data,
+ int offset,
+ int length,
+ int numMappers,
+ int numPartitions)
+ throws IOException;
+```
+
+- `shuffleId` is the unique shuffle id of the application
+- `mapId` is the map id of the shuffle
+- `attemptId` is the attempt id of the map task, i.e. speculative task or task
rerun for Apache Spark
+- `partitionId` is the partition id the data belongs to
+- `data`,`offset`,`length` specifies the bytes to be pushed
+- `numMappers` is the number map tasks in the shuffle
+- `numPartitions` is the number of partitions in the shuffle
+
+## Lazy Shuffle Register
+The first time `pushData` is called, Client will check whether the shuffle id
has been registered. If not,
+it sends `RegisterShuffle` to `LifecycleManager`, `LifecycleManager` then
sends `RequestSlots` to `Master`.
+`RequestSlots` specifies how many `PartitionLocation`s this shuffle requires,
each `PartitionLocation` logically
+responds to data of some partition id.
+
+Upon receiving `RequestSlots`, `Master` allocates slots for the shuffle among
`Worker`s. If replication is turned on,
+`Master` allocates a pair of `Worker`s for each `PartitionLocation` to store
two replicas for each `PartitionLocation`.
+The detailed allocation strategy can be found in [Slots
Allocation](../../developers/slotsallocation). `Master` then
+responds to `LifecycleManager` with the allocated `PartitionLocation`s.
+
+`LifcycleManager` caches the `PartitionLocation`s for the shuffle and responds
to each `RegisterShuffle` RPCs from
+`ShuffleClient`s.
+
+## Normal Push
+In normal cases, the process of pushing data is as follows:
+
+- `ShuffleClient` compresses data, currently supports `zstd` and `lz4`
+- `ShuffleClient` addes Header for the data: `mapId`, `attemptId`, `batchId`
and `size`. The `bastchId` is a unique
+ id for the data batch inside the (`mapId`, `attemptId`), for the purpose of
de-duplication
+- `ShuffleClient` sends `PushData` to the `Worker` on which the current
`PartitionLocation` is allocated, and holds push
+ state for this pushing
+- `Worker` receives the data, do replication if needed, then responds success
ACK to `ShuffleClient`. For more details
+ about how data is replicated and stored in `Worker`s, please refer to
[Worker](../../developers/worker)
+- Upon receiving success ACK from `Worker`, `ShuffleClient` considers success
for this pushing and modifies the push state
+
+## Push or Merge?
+If the size of data to be pushed is small, say hundreds of bytes, it will be
very inefficient to send to the wire.
+So `ShuffleClient` offers another API: `mergeData` to batch data locally
before sending to `Worker`.
+
+`mergeData` merges data with the same target into `DataBatches`. `Same target`
means the destination for both the
+primary and replica are the same. When the size of a `DataBatches` exceeds a
threshold (defaults to`64KiB`),
+`ShuffleClient` triggers pushing and sends `PushMergedData` to the destination.
+
+Upon receiving `PushMergedData`, `Worker` unpacks it into data segments each
for a specific `PartitionLocation`, then
+stores them accordingly.
+
+## Async Push
+Celeborn's `ShuffleClient` does not block compute engine's execution by
asynchronous pushing, implemented in
+`DataPusher`.
+
+Whenever compute engine decides to push data, it calls `DataPusher#addTask`,
`DataPusher` creates a `PushTask` which
+contains the data, and added the `PushTask` in a non-blocking queue.
`DataPusher` continuously poll the queue
+and invokes `ShuffleClient#pushData` to do actual push.
+
+## Split
+As mentioned before, Celeborn will split a `PartitionLocation` when any of the
following conditions happens:
+
+- `PartitionLocation` file exceeds threshold (defaults to 1GiB)
+- Usable space of local disk is less than threshold (defaults to 5GiB)
+- `Worker` is in `Graceful Shutdown` state
+- Push data fails
+
+For the first three cases, `Worker` informs `ShuffleClient` that it should
trigger split; for the last case,
+`ShuffleClient` triggers split itself.
+
+There are two kinds of Split:
+- `HARD_SPLIT`, meaning old `PartitionLocation` epoch refuses to accept any
data, and future data of the
+ `PartitionLocation` will only be pushed after new `PartitionLocation` epoch
is ready
+- `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept
data, when new epoch is ready, `ShuffleClient`
+ switches to the new location transparently
+
+The process of `SOFT_SPLIT` is as follows:
+
+
+
+`LifecycleManager` keeps the split information and tells reducer to read from
all splits of the `PartitionLocation`
+to guarantee no data is lost.
\ No newline at end of file
diff --git a/mkdocs.yml b/mkdocs.yml
index 8cfd67370..007cecb9c 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -72,12 +72,13 @@ copyright: >
<br>
nav:
- - Configuration: configuration/index.md
+ - QuickStart: README.md
- Deployment:
- Overview: deploy.md
- Kubernetes: deploy_on_k8s.md
- Upgrade: upgrade.md
- Ratis Shell: celeborn_ratis_shell.md
+ - Configuration: configuration/index.md
- Monitoring: monitoring.md
- Migration Guide: migration.md
- Developers Doc: