This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8c473c038 [CELEBORN-869][DOC] Document on Integrating Celeborn
8c473c038 is described below
commit 8c473c038b71e9f1b7602f70f425a309541e6fda
Author: Keyong Zhou <[email protected]>
AuthorDate: Wed Aug 2 17:22:41 2023 +0800
[CELEBORN-869][DOC] Document on Integrating Celeborn
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
As title.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1787 from waitinfuture/869.
Lead-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
docs/developers/flink.md | 19 ++++
docs/developers/integrate.md | 180 +++++++++++++++++++++++++++++++++++++
docs/developers/spark.md | 19 ++++
docs/developers/workerexclusion.md | 20 +++--
mkdocs.yml | 1 +
5 files changed, 230 insertions(+), 9 deletions(-)
diff --git a/docs/developers/flink.md b/docs/developers/flink.md
new file mode 100644
index 000000000..28312b7c1
--- /dev/null
+++ b/docs/developers/flink.md
@@ -0,0 +1,19 @@
+---
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+# Flink Plugin
\ No newline at end of file
diff --git a/docs/developers/integrate.md b/docs/developers/integrate.md
new file mode 100644
index 000000000..25b71d231
--- /dev/null
+++ b/docs/developers/integrate.md
@@ -0,0 +1,180 @@
+---
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+# Integrating Celeborn
+## Overview
+The core components of Celeborn, i.e. `Master`, `Worker`, and `Client` are all
engine irrelevant. Developers can
+integrate Celeborn with various engines or applications by using or extending
Celeborn's `Client`, as the officially
+supported plugins for Apache Spark and Apache Flink, see [Spark
Plugin](../../developers/spark) and
+[Flink Plugin](../../developers/flink).
+
+This article briefly describes an example of integrating Celeborn into a
simple distributed application using
+Celeborn `Client`.
+
+## Background
+Say we have an distributed application who has two phases:
+
+- Write phase that parallel tasks write data to some data service, each record
is classified into some logical id,
+ say partition id.
+- Read phase that parallel tasks read data from the data service, each task
read data from a specified partition id.
+
+Suppose the application has failover mechanism so that it's acceptable that
when some data is lost the application
+will rerun tasks.
+
+Say developers of this application is searching for a suitable data service,
and accidentally finds this article.
+
+## Step One: Setup Celeborn Cluster
+First, you need an available Celeborn Cluster. Refer to [QuickStart](../../)
to set up a simple cluster in a
+single node, or [Deploy](../../deploy) to set up a multi-node cluster,
standalone or on K8s.
+
+## Step Two: Create LifecycleManager
+As described in [Client](../../developers/client), `Client` is separated into
`LifecycleManager`, which is singleton
+through an application; and `ShuffleClient`, which can have multiple instances.
+
+Step two is to create a `LifecycleManager` instance, using the following API:
+
+```scala
+class LifecycleManager(val appUniqueId: String, val conf: CelebornConf)
+```
+
+- `appUniqueId` is the application id. Celeborn cluster stores, serves, and
cleans up data in the granularity of
+ (application id, shuffle id)
+- `conf` is an object of `CelebornConf`. The only required configuration is
the address of Celeborn `Master`. For
+ the thorough description of configs, refer to
[Configuration](../../configuration)
+
+The example java code to create an `LifecycleManager` instance is as follows:
+
+```java
+CelebornConf celebornConf = new CelebornConf();
+celebornConf.set("celeborn.master.endpoints", "<Master IP>:<Master Port>");
+
+LifecycleManager lm = new LifecycleManager("myApp", celebornConf);
+```
+
+`LifecycleManager` object automatically starts necessary service after
creation, so there is no need to call
+other APIs to initialize it. You can get `LifecycleManager`'s address after
creating it, which is needed to
+create `ShuffleClient`.
+
+```java
+String host = lm.getHost();
+int = lm.getPort();
+```
+
+## Step Three: Create ShuffleClient
+With `LifecycleManager`'s host and port, you can create `ShuffleClient` using
the following API:
+
+```java
+public static ShuffleClient get(
+ String appUniqueId,
+ String host,
+ int port,
+ CelebornConf conf,
+ UserIdentifier userIdentifier)
+```
+
+- `appUniqueId` is the application id, same as above.
+- `host` is the host of `LifecycleManager`
+- `port` is the port of `LifecycleManager`
+- `conf` is an object of `CelebornConf`, safe to pass an empty object
+- `userIdentifier` specifies user identity, safe to pass null
+
+You can create a `ShuffleClient` object using the following code:
+
+```java
+ShuffleClient shuffleClient =
+ ShuffleClient.get(
+ "myApp",
+ <LifecycleManager Host>,
+ <LifecycleManager Port>,
+ new CelebornConf(),
+ null);
+```
+
+This method returns a singleton `ShuffleClientImpl` object, and it's
recommended to use this way as `ShuffleClientImpl`
+maintains status and reuses resource across all shuffles. To make it work, you
have to ensure that the
+`LifecycleManager`'s host and port are reachable.
+
+In practice, one `ShuffleClient` instance is created in each Executor process
of Spark, or in each TaskManager
+process of Flink.
+
+## Step Three: Push Data
+You can then push data with `ShuffleClient` with
[pushData](../../developers/shuffleclient#api-specification), like
+the following:
+
+```java
+int bytesWritten =
+ shuffleClient.pushData(
+ shuffleId,
+ mapId,
+ attemptId,
+ partitionId,
+ data,
+ 0,
+ length,
+ numMappers,
+ numPartitions);
+```
+
+Each call of `pushData` passes a byte array containing data from the same
partition id. In addition to specifying the
+shuffleId, mapId, attemptId that the data belongs, `ShuffleClient` should also
specify the number of mappers and the
+number of partitions for [Lazy
Register](../../developers/shuffleclient#lazy-shuffle-register).
+
+After the map task finishes, `ShuffleClient` should call `mapperEnd` to tell
`LifecycleManager` that the map task
+finishes pushing its data:
+
+```java
+public abstract void mapperEnd(
+ int shuffleId,
+ int mapId,
+ int attempted,
+ int numMappers)
+```
+
+- `mapId` map id of the current task
+- `attemptId` attempt id of the current task
+- `numMappers` number of map ids in this shuffle
+
+## Step Four: Read Data
+After all tasks successfully called `mapperEnd`, you can start reading data
from some partition id, using the
+[readPartition API](../../developers/shuffleclient#api-specification_1), as
the following code:
+
+```java
+InputStream inputStream = shuffleClient.readPartition(
+ shuffleId,
+ partitionId,
+ attemptId,
+ startMapIndex,
+ endMapIndex);
+
+int byte = inputstream.read();
+```
+
+For simplicity, to read the whole data from the partition, you can pass 0 and
`Integer.MAX_VALUE` to `startMapIndex`
+and `endMapIndex`. This method will create an InputStream for the data, and
guarantees no data lost and no
+duplicate reading, else exception will be thrown.
+
+## Step Five: Clean Up
+After the shuffle finishes, you can call `LifecycleManager.unregisterShuffle`
to clean up resources related to the
+shuffle:
+
+```java
+lm.unregisterShuffle(0);
+```
+
+It's safe not to call `unregisterShuffle`, because Celeborn `Master`
recognizes application finish by heartbeat
+timeout, and will self-clean resources in the cluster.
\ No newline at end of file
diff --git a/docs/developers/spark.md b/docs/developers/spark.md
new file mode 100644
index 000000000..6dd0bfc30
--- /dev/null
+++ b/docs/developers/spark.md
@@ -0,0 +1,19 @@
+---
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+# Spark Plugin
\ No newline at end of file
diff --git a/docs/developers/workerexclusion.md
b/docs/developers/workerexclusion.md
index 44f7db49d..fdd24b3f6 100644
--- a/docs/developers/workerexclusion.md
+++ b/docs/developers/workerexclusion.md
@@ -16,7 +16,7 @@ license: |
limitations under the License.
---
-# Overview
+# Worker Exclusion
`Worker`s can fail, temporarily or permanently. To reduce the impact of
`Worker` failure, Celeborn tries to
figure out `Worker` status as soon as possible, and as correct as possible.
This article describes detailed
design of `Worker` exclusion.
@@ -27,7 +27,7 @@ and `Client`. `Client` is further separated into
`LifecycleManager` and `Shuffle
/`ShuffleClient` need to know about `Worker` status, actively or reactively.
## Master Side Exclusion
-`Master` maintains the ground-truth status of `Worker`s, with relatively
longer delay. Master maintains four
+`Master` maintains the ground-truth status of `Worker`s, with relatively
longer delay. `Master` maintains four
lists of `Worker`s with different status:
- Active list. `Worker`s that have successfully registered to `Master`, and
heartbeat never timed out.
@@ -43,7 +43,7 @@ shutdown list. Since `Master` only exclude `Worker`s upon
heartbeat, it has rela
## ShuffleClient Side Exclusion
`ShuffleClient`'s local exclusion list is essential to performance. Say the
timeout to create network
-connection is 10s, if `ShuffleClient` blindly pushes data to a non-exist
`Worker`, the task will hang forever.
+connection is 10s, if `ShuffleClient` blindly pushes data to a non-exist
`Worker`, the task will hang for a long time.
Waiting for `Master` to inform the exclusion list is unacceptable because of
the delay. Instead, `ShuffleClient`
actively exclude `Worker`s when it encounters critical exceptions, for example:
@@ -54,16 +54,15 @@ actively exclude `Worker`s when it encounters critical
exceptions, for example:
- Connection exception happened
In addition to exclude the `Worker`s locally, `ShuffleClient` also carries the
cause of push failure with
-[Revive](../../developers/faulttolerant#handle-pushdata-failure) for
`LifecycleManager` to also exclude the `Worker`s,
-see the section below.
+[Revive](../../developers/faulttolerant#handle-pushdata-failure) to
`LifecycleManager`, see the section below.
-Such strategy is aggressive, meaning false negative may happen. To rectify,
`ShuffleClient` removes `Worker`s from
-the excluded list whenever an event happens that indicates some `Worker` is
available, for example:
+Such strategy is aggressive, false negative may happen. To rectify,
`ShuffleClient` removes a `Worker` from
+the excluded list whenever an event happens that indicates that `Worker` is
available, for example:
- When the `Worker` is allocated slots in register shuffle
- When `LifecycleManager` says the `Worker` is available in response of Revive
-Currently, exclusion list in `ShuffleClient` is optional, users can configure
using the following configs:
+Currently, exclusion in `ShuffleClient` is optional, users can configure using
the following configs:
`celeborn.client.push/fetch.excludeWorkerOnFailure.enabled`
@@ -78,4 +77,7 @@ excludes a `Worker` in the following scenarios:
`LifecycleManager` will remove `Worker` from the excluded list in the
following scenarios:
- For critical causes, when timeout expires (defaults to 180s)
-- For non-critical causes, when it's not in `Master`'s exclusion list
\ No newline at end of file
+- For non-critical causes, when it's not in `Master`'s exclusion list
+
+In the response of Revive, `LifecycleManager` checks the status of the
`Worker` where previous push data has failed.
+`ShuffleClient` will remove from local exclusion list if the `Worker` is
available.
\ No newline at end of file
diff --git a/mkdocs.yml b/mkdocs.yml
index 43640a777..f842a5732 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -94,3 +94,4 @@ nav:
- ShuffleClient: developers/shuffleclient.md
- Fault Tolerant: developers/faulttolerant.md
- Worker Exclusion: developers/workerexclusion.md
+ - Integrating Celeborn: developers/integrate.md