This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fc4f241f [CELEBORN-2146] Setting the DFS replication factor for 
balanced fault tolerance and storage efficiency
7fc4f241f is described below

commit 7fc4f241f792774190c7724ae716345603c65d35
Author: xxx <953396...@qq.com>
AuthorDate: Sat Sep 20 15:20:50 2025 +0800

    [CELEBORN-2146] Setting the DFS replication factor for balanced fault 
tolerance and storage efficiency
    
    …erance and storage efficiency
    
    ### What changes were proposed in this pull request?
    
    Set the DFS replication factor for balanced fault tolerance and storage 
efficiency.
    
    ### Why are the changes needed?
    
    Setting replication factor is to determine the number of redundant copies 
of data that should be maintained across multiple nodes, ensuring fault 
tolerance and data availability in case of node failures.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    CI
    
    Closes #3472 from xy2953396112/CELEBORN-2146.
    
    Authored-by: xxx <953396...@qq.com>
    Signed-off-by: SteNicholas <programg...@163.com>
---
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++++
 docs/configuration/worker.md                                  |  1 +
 .../celeborn/service/deploy/worker/storage/TierWriter.scala   |  4 ++++
 3 files changed, 16 insertions(+)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 19cc15982..9b2d044aa 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -690,6 +690,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED)
   def workerFlushReuseCopyBufferEnabled: Boolean =
     get(WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED)
+  def workerDfsReplicationFactor: Int =
+    get(WORKER_DFS_REPLICATION_FACTOR)
+
   def clusterName: String = get(CLUSTER_NAME)
 
   // //////////////////////////////////////////////////////
@@ -6687,4 +6690,12 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefaultString("true")
 
+  val WORKER_DFS_REPLICATION_FACTOR: ConfigEntry[Int] =
+    buildConf("celeborn.worker.hdfs.replication.factor")
+      .categories("worker")
+      .version("0.7.0")
+      .doc("HDFS replication factor for shuffle files.")
+      .intConf
+      .createWithDefault(2)
+
 }
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 72343249e..4c688fff3 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -106,6 +106,7 @@ license: |
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | 
false | Interval for a Celeborn worker to flush committed file infos into Level 
DB. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false 
| Whether to call sync method to save committed file infos into Level DB to 
handle OS crash. | 0.3.1 |  | 
 | celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's 
graceful shutdown timeout time. | 0.2.0 |  | 
+| celeborn.worker.hdfs.replication.factor | 2 | false | HDFS replication 
factor for shuffle files. | 0.7.0 |  | 
 | celeborn.worker.http.auth.administers |  | false | A comma-separated list of 
users who have admin privileges, Note, when 
celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as 
administrator. | 0.6.0 |  | 
 | celeborn.worker.http.auth.basic.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined password authentication implementation of 
org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 
|  | 
 | celeborn.worker.http.auth.bearer.provider | 
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | 
false | User-defined token authentication implementation of 
org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | 
 | 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index f4b5e19f9..93a6c2989 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -536,6 +536,7 @@ class DfsTierWriter(
 
   try {
     hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+    hadoopFs.setReplication(dfsFileInfo.getDfsPath, 
conf.workerDfsReplicationFactor.toShort);
     if (dfsFileInfo.isS3) {
       val uri = hadoopFs.getUri
       val bucketName = uri.getHost
@@ -654,6 +655,9 @@ class DfsTierWriter(
       hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
       if (dfsFileInfo.isReduceFileMeta) {
         val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
+        hadoopFs.setReplication(
+          dfsFileInfo.getDfsIndexPath,
+          conf.workerDfsReplicationFactor.toShort)
         val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream()
         val dataStream = new DataOutputStream(byteStream)
         try {

Reply via email to