This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 57a35ca34 [CELEBORN-498] Add new config for DfsPartitionReader's chunk 
size
57a35ca34 is described below

commit 57a35ca3496f7680298e30817f34ebd2477931ed
Author: lishiyucn <[email protected]>
AuthorDate: Thu Aug 24 21:31:34 2023 +0800

    [CELEBORN-498] Add new config for DfsPartitionReader's chunk size
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Make `celeborn.shuffle.chunk.size` worker side only config.
    Add a new client side config `celeborn.client.fetch.dfsReadChunkSize` for 
DfsPartitionReader
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the chunks size of DfsPartitionReader is changed from client side 
config `celeborn.shuffle.chunk.size`
    to `celeborn.client.fetch.dfsReadChunkSize`
    
    ### How was this patch tested?
    Passes GA
    
    Closes #1834 from lishiyucn/main.
    
    Lead-authored-by: lishiyucn <[email protected]>
    Co-authored-by: shiyu li <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/client/read/DfsPartitionReader.java   |  4 ++--
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala  | 11 ++++++++++-
 docs/configuration/client.md                                  |  2 +-
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 637aada7f..d42219773 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -47,7 +47,7 @@ import org.apache.celeborn.common.util.Utils;
 public class DfsPartitionReader implements PartitionReader {
   private static Logger logger = 
LoggerFactory.getLogger(DfsPartitionReader.class);
   PartitionLocation location;
-  private final int shuffleChunkSize;
+  private final long shuffleChunkSize;
   private final int fetchMaxReqsInFlight;
   private final LinkedBlockingQueue<ByteBuf> results;
   private final AtomicReference<IOException> exception = new 
AtomicReference<>();
@@ -66,7 +66,7 @@ public class DfsPartitionReader implements PartitionReader {
       int startMapIndex,
       int endMapIndex)
       throws IOException {
-    shuffleChunkSize = (int) conf.shuffleChunkSize();
+    shuffleChunkSize = conf.dfsReadChunkSize();
     fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
     results = new LinkedBlockingQueue<>();
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 65e55ef51..b11d475cb 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -804,6 +804,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
   def shuffleManagerPort: Int = get(CLIENT_SHUFFLE_MANAGER_PORT)
   def shuffleChunkSize: Long = get(SHUFFLE_CHUNK_SIZE)
+  def dfsReadChunkSize: Long = get(CLIENT_FETCH_DFS_READ_CHUNK_SIZE)
   def shufflePartitionSplitMode: PartitionSplitMode =
     PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
   def shufflePartitionSplitThreshold: Long = 
get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
@@ -1944,13 +1945,21 @@ object CelebornConf extends Logging {
 
   val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.shuffle.chunk.size")
-      .categories("client", "worker")
+      .categories("worker")
       .version("0.2.0")
       .doc("Max chunk size of reducer's merged shuffle data. For example, if a 
reducer's " +
         "shuffle data is 128M and the data will need 16 fetch chunk requests 
to fetch.")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("8m")
 
+  val CLIENT_FETCH_DFS_READ_CHUNK_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.client.fetch.dfsReadChunkSize")
+      .categories("client")
+      .version("0.3.1")
+      .doc("Max chunk size for DfsPartitionReader.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("8m")
+
   val WORKER_PARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.shuffle.partitionSplit.enabled")
       .withAlternative("celeborn.worker.partition.split.enabled")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index d7a1f0914..028650c7f 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -24,6 +24,7 @@ license: |
 | celeborn.client.commitFiles.ignoreExcludedWorker | false | When true, 
LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | 
 | celeborn.client.excludePeerWorkerOnFailure.enabled | true | When true, 
Celeborn will exclude partition's peer worker on failure when push data to 
replica failed. | 0.3.0 | 
 | celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for 
LifecycleManager to clear reserved excluded worker. Default to be 1.5 * 
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout 
check period | 0.3.0 | 
+| celeborn.client.fetch.dfsReadChunkSize | 8m | Max chunk size for 
DfsPartitionReader. | 0.3.1 | 
 | celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to 
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 | 
 | celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of 
celeborn.client.excludedWorker.expireTimeout&gt; | ShuffleClient is a static 
object, it will be used in the whole lifecycle of Executor,We give a expire 
time for excluded workers to avoid a transient worker issues. | 0.3.0 | 
 | celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch 
request. | 0.3.0 | 
@@ -97,6 +98,5 @@ license: |
 | celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold | 
2147483647 | Celeborn will only accept shuffle of partition number lower than 
this configuration value. | 0.3.0 | 
 | celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the 
following kind of shuffle writers. 1. hash: hash-based shuffle writer works 
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer 
works fine when memory pressure is high or shuffle partition count is huge. | 
0.3.0 | 
 | celeborn.master.endpoints | &lt;localhost&gt;:9097 | Endpoints of master 
nodes for celeborn client to connect, allowed pattern is: 
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If 
the port is omitted, 9097 will be used. | 0.2.0 | 
-| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged 
shuffle data. For example, if a reducer's shuffle data is 128M and the data 
will need 16 fetch chunk requests to fetch. | 0.2.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
 <!--end-include-->

Reply via email to