This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 61211edce [CELEBORN-2058] Add retry to avoid committing failed with
HDFS storage
61211edce is described below
commit 61211edcefc874012046f7a8a273239759b4fb32
Author: taowenjun <[email protected]>
AuthorDate: Sun Sep 28 16:28:55 2025 +0800
[CELEBORN-2058] Add retry to avoid committing failed with HDFS storage
### What changes were proposed in this pull request?
Retry to create index file and success file after failure in hdfs tier
storage.
### Why are the changes needed?
When creating index file with HDFS storage,HDFS namenode may response all
datanodes are bad or replica is not enough.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
end to end test.
Closes #3360 from taowenjun/CELEBORN-2058.
Authored-by: taowenjun <[email protected]>
Signed-off-by: Ethan Feng <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 21 +++++++++++
docs/configuration/worker.md | 2 ++
.../service/deploy/worker/storage/TierWriter.scala | 42 ++++++++++++++--------
3 files changed, 50 insertions(+), 15 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index be1d6fb5a..b3afc6179 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1372,6 +1372,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
def workerCreateWriterMaxAttempts: Int =
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
+ def workerWriterHdfsCreateAuxiliaryFileMaxRetries: Int =
+ get(WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_MAX_RETRIES)
+ def workerWriterHdfsCreateAuxiliaryFileRetryWait: Long =
+ get(WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_RETRY_WAIT)
def workerFlusherLocalGatherAPIEnabled: Boolean =
get(WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED)
// //////////////////////////////////////////////////////
@@ -4086,6 +4090,23 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(3)
+ val WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_MAX_RETRIES: ConfigEntry[Int] =
+ buildConf("celeborn.worker.writer.hdfs.createAuxiliaryFile.maxRetries")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("Retry count for a auxiliary file including index file and success
file with HDFS storage to create" +
+ " if its creation was failed.")
+ .intConf
+ .createWithDefault(5)
+
+ val WORKER_WRITER_HDFS_CREATE_AUXILIARY_FILE_RETRY_WAIT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.writer.hdfs.createAuxiliaryFile.retryWait")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("Wait interval after failure to create a auxiliary file with HDFS
storage and then retry it.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("200ms")
+
val WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.flusher.local.gatherAPI.enabled")
.internal
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index b506c2728..b2343b55b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -195,5 +195,7 @@ license: |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false |
Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file
writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a
file writer to create if its creation was failed. | 0.2.0 | |
+| celeborn.worker.writer.hdfs.createAuxiliaryFile.maxRetries | 5 | false |
Retry count for a auxiliary file including index file and success file with
HDFS storage to create if its creation was failed. | 0.7.0 | |
+| celeborn.worker.writer.hdfs.createAuxiliaryFile.retryWait | 200ms | false |
Wait interval after failure to create a auxiliary file with HDFS storage and
then retry it. | 0.7.0 | |
| worker.flush.reuseCopyBuffer.enabled | true | false | Whether to enable
reuse copy buffer for flush. Note that this copy buffer must not be referenced
again after flushing. This means that, for example, the Hdfs(Oss or S3) client
will not asynchronously access this buffer after the flush method returns,
otherwise data modification problems will occur. | 0.6.1 | |
<!--end-include-->
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 93a6c2989..530b345bf 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -36,6 +36,7 @@ import
org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.FileChannelUtils
+import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler
import org.apache.celeborn.service.deploy.worker.WorkerSource
import
org.apache.celeborn.service.deploy.worker.congestcontrol.{CongestionController,
UserCongestionControlContext}
@@ -652,24 +653,35 @@ class DfsTierWriter(
hadoopFs.delete(dfsFileInfo.getDfsPath, false)
deleted = true
} else {
- hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
+ val retryCount = conf.workerWriterHdfsCreateAuxiliaryFileMaxRetries
+ val retryWait = conf.workerWriterHdfsCreateAuxiliaryFileRetryWait
+ Utils.withRetryOnTimeoutOrIOException(retryCount, retryWait) {
+ hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
+ }
if (dfsFileInfo.isReduceFileMeta) {
- val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
- hadoopFs.setReplication(
- dfsFileInfo.getDfsIndexPath,
- conf.workerDfsReplicationFactor.toShort)
- val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream()
- val dataStream = new DataOutputStream(byteStream)
- try {
-
dataStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
- for (offset <-
dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
- dataStream.writeLong(offset)
+ Utils.withRetryOnTimeoutOrIOException(retryCount, retryWait) {
+ if (hadoopFs.exists(dfsFileInfo.getDfsIndexPath)) {
+ hadoopFs.delete(dfsFileInfo.getDfsIndexPath, true)
+ }
+ val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
+ hadoopFs.setReplication(
+ dfsFileInfo.getDfsIndexPath,
+ conf.workerDfsReplicationFactor.toShort)
+ val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream()
+ val dataStream = new DataOutputStream(byteStream)
+ try {
+
dataStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
+
dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala.foreach(dataStream.writeLong(_))
+ indexOutputStream.write(byteStream.toByteArray)
+ } finally {
+ if (dataStream != null) {
+ dataStream.close()
+ }
+ if (indexOutputStream != null) {
+ indexOutputStream.close()
+ }
}
- indexOutputStream.write(byteStream.toByteArray)
- } finally if (dataStream != null) {
- dataStream.close()
}
- indexOutputStream.close()
}
}
if (s3MultipartUploadHandler != null) {