This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 57a35ca34 [CELEBORN-498] Add new config for DfsPartitionReader's chunk
size
57a35ca34 is described below
commit 57a35ca3496f7680298e30817f34ebd2477931ed
Author: lishiyucn <[email protected]>
AuthorDate: Thu Aug 24 21:31:34 2023 +0800
[CELEBORN-498] Add new config for DfsPartitionReader's chunk size
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Make `celeborn.shuffle.chunk.size` worker side only config.
Add a new client side config `celeborn.client.fetch.dfsReadChunkSize` for
DfsPartitionReader
### Does this PR introduce _any_ user-facing change?
Yes, the chunks size of DfsPartitionReader is changed from client side
config `celeborn.shuffle.chunk.size`
to `celeborn.client.fetch.dfsReadChunkSize`
### How was this patch tested?
Passes GA
Closes #1834 from lishiyucn/main.
Lead-authored-by: lishiyucn <[email protected]>
Co-authored-by: shiyu li <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/read/DfsPartitionReader.java | 4 ++--
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 ++++++++++-
docs/configuration/client.md | 2 +-
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 637aada7f..d42219773 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -47,7 +47,7 @@ import org.apache.celeborn.common.util.Utils;
public class DfsPartitionReader implements PartitionReader {
private static Logger logger =
LoggerFactory.getLogger(DfsPartitionReader.class);
PartitionLocation location;
- private final int shuffleChunkSize;
+ private final long shuffleChunkSize;
private final int fetchMaxReqsInFlight;
private final LinkedBlockingQueue<ByteBuf> results;
private final AtomicReference<IOException> exception = new
AtomicReference<>();
@@ -66,7 +66,7 @@ public class DfsPartitionReader implements PartitionReader {
int startMapIndex,
int endMapIndex)
throws IOException {
- shuffleChunkSize = (int) conf.shuffleChunkSize();
+ shuffleChunkSize = conf.dfsReadChunkSize();
fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
results = new LinkedBlockingQueue<>();
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 65e55ef51..b11d475cb 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -804,6 +804,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
+ def dfsReadChunkSize: Long = get(CLIENT_FETCH_DFS_READ_CHUNK_SIZE)
def shufflePartitionSplitMode: PartitionSplitMode =
PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
def shufflePartitionSplitThreshold: Long =
get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
@@ -1944,13 +1945,21 @@ object CelebornConf extends Logging {
val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
buildConf("celeborn.shuffle.chunk.size")
- .categories("client", "worker")
+ .categories("worker")
.version("0.2.0")
.doc("Max chunk size of reducer's merged shuffle data. For example, if a
reducer's " +
"shuffle data is 128M and the data will need 16 fetch chunk requests
to fetch.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8m")
+ val CLIENT_FETCH_DFS_READ_CHUNK_SIZE: ConfigEntry[Long] =
+ buildConf("celeborn.client.fetch.dfsReadChunkSize")
+ .categories("client")
+ .version("0.3.1")
+ .doc("Max chunk size for DfsPartitionReader.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("8m")
+
val WORKER_PARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.shuffle.partitionSplit.enabled")
.withAlternative("celeborn.worker.partition.split.enabled")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index d7a1f0914..028650c7f 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -24,6 +24,7 @@ license: |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | When true,
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 |
| celeborn.client.excludePeerWorkerOnFailure.enabled | true | When true,
Celeborn will exclude partition's peer worker on failure when push data to
replica failed. | 0.3.0 |
| celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for
LifecycleManager to clear reserved excluded worker. Default to be 1.5 *
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout
check period | 0.3.0 |
+| celeborn.client.fetch.dfsReadChunkSize | 8m | Max chunk size for
DfsPartitionReader. | 0.3.1 |
| celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 |
| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static
object, it will be used in the whole lifecycle of Executor,We give a expire
time for excluded workers to avoid a transient worker issues. | 0.3.0 |
| celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.3.0 |
@@ -97,6 +98,5 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | Celeborn will only accept shuffle of partition number lower than
this configuration value. | 0.3.0 |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge. |
0.3.0 |
| celeborn.master.endpoints | <localhost>:9097 | Endpoints of master
nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 |
-| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->