This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 41fdb8ade [CELEBORN-1490][CIP-6] Add Flink hybrid shuffle doc
41fdb8ade is described below
commit 41fdb8ade10b97615533438607a2cb1add3a4232
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Nov 1 13:37:14 2024 +0800
[CELEBORN-1490][CIP-6] Add Flink hybrid shuffle doc
### What changes were proposed in this pull request?
Add Flink hybrid shuffle doc
### Why are the changes needed?
We need the doc for the new hybrid shuffle mode.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no neeed.
Closes #2867 from reswqa/add-hs-doc.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
README.md | 28 +++++++++++++++++++++++++++-
docs/README.md | 16 ++++++++++++++++
docs/deploy.md | 28 +++++++++++++++++++++++++++-
3 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 3733e19ef..95c2c2a38 100644
--- a/README.md
+++ b/README.md
@@ -311,10 +311,17 @@ spark.executor.userClassPathFirst false
```
### Deploy Flink client
+
+**Important: Only Flink batch jobs are supported for now.**
+
Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`.
#### Flink Configuration
-To use Celeborn, the following flink configurations should be added.
+Celeborn supports two Flink integration strategies: remote shuffle service
(since Flink 1.14) and [hybrid
shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle)
(since Flink 1.20).
+
+To use Celeborn, you can choose one of them and add the following Flink
configurations.
+
+##### Flink Remote Shuffle Service Configuration
```properties
shuffle-service-factory.class:
org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
@@ -337,6 +344,25 @@ taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as
`ALL_EXCHANGES_BLOCKING`.
+##### Flink Hybrid Shuffle Configuration
+```properties
+shuffle-service-factory.class:
org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
+taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class:
org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
+execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
+jobmanager.partition.hybrid.partition-data-consume-constraint:
ALL_PRODUCERS_FINISHED
+
+celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
+celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
+celeborn.client.push.maxReqsInFlight: 128
+# Network connections between peers
+celeborn.data.io.numConnectionsPerPeer: 16
+# threads number may vary according to your cluster but do not set to 1
+celeborn.data.io.threads: 32
+celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
+celeborn.rpc.dispatcher.numThreads: 32
+```
+**Note**: The config option `execution.batch-shuffle-mode` should configure as
`ALL_EXCHANGES_HYBRID_FULL`.
+
### Deploy MapReduce client
Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and
`yarn.application.classpath`.
Meanwhile, configure the following settings in YARN and MapReduce config.
diff --git a/docs/README.md b/docs/README.md
index d439ed221..64c50357d 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -118,6 +118,9 @@ INFO [async-reply] Controller: CommitFiles for
local-1690000152711-0 success wit
```
## Start Flink with Celeborn
+
+**Important: Only Flink batch jobs are supported for now.**
+
#### Copy Celeborn Client to Flink's lib
Celeborn release binary contains clients for Flink 1.14.x, Flink 1.15.x, Flink
1.17.x, Flink 1.18.x, Flink 1.19.x and Flink 1.20.x, copy the corresponding
client jar into Flink's
`lib/` directory:
@@ -138,12 +141,25 @@ vi conf/flink-conf.yaml
cd $FLINK_HOME
vi conf/config.yaml
```
+
+Choose one of flink integration strategies and add the following configuration:
+
+**(Support Flink 1.14 and above versions) Flink Remote Shuffle Service Config**
```properties
shuffle-service-factory.class:
org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as
`ALL_EXCHANGES_BLOCKING`.
+**(Support Flink 1.20 and above versions) Flink [hybrid
shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle)
Config**
+```properties
+shuffle-service-factory.class:
org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
+taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class:
org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
+execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
+jobmanager.partition.hybrid.partition-data-consume-constraint:
ALL_PRODUCERS_FINISHED
+```
+**Note**: The config option `execution.batch-shuffle-mode` should configure as
`ALL_EXCHANGES_HYBRID_FULL`.
+
Then deploy the example word count job to the running cluster:
```shell
cd $FLINK_HOME
diff --git a/docs/deploy.md b/docs/deploy.md
index a8498b526..e67b827da 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -206,10 +206,17 @@ spark.executor.userClassPathFirst false
```
## Deploy Flink client
+
+**Important: Only Flink batch jobs are supported for now.**
+
Copy `$CELEBORN_HOME/flink/*.jar` to `$FLINK_HOME/lib/`.
### Flink Configuration
-To use Celeborn, the following flink configurations should be added.
+Celeborn supports two Flink integration strategies: remote shuffle service
(since Flink 1.14) and [hybrid
shuffle](https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/batch/batch_shuffle/#hybrid-shuffle)
(since Flink 1.20).
+
+To use Celeborn, you can choose one of them and add the following Flink
configurations.
+
+#### Flink Remote Shuffle Service Configuration
```properties
shuffle-service-factory.class:
org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
execution.batch-shuffle-mode: ALL_EXCHANGES_BLOCKING
@@ -232,6 +239,25 @@ taskmanager.memory.task.off-heap.size: 512m
```
**Note**: The config option `execution.batch-shuffle-mode` should configure as
`ALL_EXCHANGES_BLOCKING`.
+##### Flink Hybrid Shuffle Configuration
+```properties
+shuffle-service-factory.class:
org.apache.flink.runtime.io.network.NettyShuffleServiceFactory
+taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class:
org.apache.celeborn.plugin.flink.tiered.CelebornTierFactory
+execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL
+jobmanager.partition.hybrid.partition-data-consume-constraint:
ALL_PRODUCERS_FINISHED
+
+celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
+celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
+celeborn.client.push.maxReqsInFlight: 128
+# Network connections between peers
+celeborn.data.io.numConnectionsPerPeer: 16
+# threads number may vary according to your cluster but do not set to 1
+celeborn.data.io.threads: 32
+celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
+celeborn.rpc.dispatcher.numThreads: 32
+```
+**Note**: The config option `execution.batch-shuffle-mode` should configure as
`ALL_EXCHANGES_HYBRID_FULL`.
+
## Deploy MapReduce client
Copy `$CELEBORN_HOME/mr/*.jar` into `mapreduce.application.classpath` and
`yarn.application.classpath`.
Meanwhile, configure the following settings in YARN and MapReduce config.