This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push: new 7fc4f241f [CELEBORN-2146] Setting the DFS replication factor for balanced fault tolerance and storage efficiency 7fc4f241f is described below commit 7fc4f241f792774190c7724ae716345603c65d35 Author: xxx <953396...@qq.com> AuthorDate: Sat Sep 20 15:20:50 2025 +0800 [CELEBORN-2146] Setting the DFS replication factor for balanced fault tolerance and storage efficiency …erance and storage efficiency ### What changes were proposed in this pull request? Set the DFS replication factor for balanced fault tolerance and storage efficiency. ### Why are the changes needed? Setting replication factor is to determine the number of redundant copies of data that should be maintained across multiple nodes, ensuring fault tolerance and data availability in case of node failures. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI Closes #3472 from xy2953396112/CELEBORN-2146. Authored-by: xxx <953396...@qq.com> Signed-off-by: SteNicholas <programg...@163.com> --- .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++ docs/configuration/worker.md | 1 + .../celeborn/service/deploy/worker/storage/TierWriter.scala | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 19cc15982..9b2d044aa 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -690,6 +690,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED) def workerFlushReuseCopyBufferEnabled: Boolean = get(WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED) + def workerDfsReplicationFactor: Int = + get(WORKER_DFS_REPLICATION_FACTOR) + def clusterName: String = get(CLUSTER_NAME) // ////////////////////////////////////////////////////// @@ -6687,4 +6690,12 @@ object CelebornConf extends Logging { .booleanConf .createWithDefaultString("true") + val WORKER_DFS_REPLICATION_FACTOR: ConfigEntry[Int] = + buildConf("celeborn.worker.hdfs.replication.factor") + .categories("worker") + .version("0.7.0") + .doc("HDFS replication factor for shuffle files.") + .intConf + .createWithDefault(2) + } diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 72343249e..4c688fff3 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -106,6 +106,7 @@ license: | | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | false | Interval for a Celeborn worker to flush committed file infos into Level DB. | 0.3.1 | | | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 | | | celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's graceful shutdown timeout time. | 0.2.0 | | +| celeborn.worker.hdfs.replication.factor | 2 | false | HDFS replication factor for shuffle files. | 0.7.0 | | | celeborn.worker.http.auth.administers | | false | A comma-separated list of users who have admin privileges, Note, when celeborn.worker.http.auth.supportedSchemes is not set, everyone is treated as administrator. | 0.6.0 | | | celeborn.worker.http.auth.basic.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined password authentication implementation of org.apache.celeborn.common.authentication.PasswdAuthenticationProvider | 0.6.0 | | | celeborn.worker.http.auth.bearer.provider | org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl | false | User-defined token authentication implementation of org.apache.celeborn.common.authentication.TokenAuthenticationProvider | 0.6.0 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala index f4b5e19f9..93a6c2989 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala @@ -536,6 +536,7 @@ class DfsTierWriter( try { hadoopFs.create(dfsFileInfo.getDfsPath, true).close() + hadoopFs.setReplication(dfsFileInfo.getDfsPath, conf.workerDfsReplicationFactor.toShort); if (dfsFileInfo.isS3) { val uri = hadoopFs.getUri val bucketName = uri.getHost @@ -654,6 +655,9 @@ class DfsTierWriter( hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close() if (dfsFileInfo.isReduceFileMeta) { val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath) + hadoopFs.setReplication( + dfsFileInfo.getDfsIndexPath, + conf.workerDfsReplicationFactor.toShort) val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream() val dataStream = new DataOutputStream(byteStream) try {