This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 91a3a1a98 [CELEBORN-2119] DfsTierWriter should close
s3MultipartUploadHandler and ossMultipartUploadHandler for close resource
91a3a1a98 is described below
commit 91a3a1a98c25e5fa611aad9ab928261f4f489335
Author: SteNicholas <[email protected]>
AuthorDate: Tue Aug 19 14:57:16 2025 +0800
[CELEBORN-2119] DfsTierWriter should close s3MultipartUploadHandler and
ossMultipartUploadHandler for close resource
### What changes were proposed in this pull request?
`DfsTierWriter` should close `s3MultipartUploadHandler` and
`ossMultipartUploadHandler` for close resource to avoid resource leak for
destroy file writer.
### Why are the changes needed?
`DfsTierWriter` does not close `s3MultipartUploadHandler` and
`ossMultipartUploadHandler` in `closeResource`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3433 from SteNicholas/CELEBORN-2119.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit adfc563828c1ab1459ad9d93d6f6743e6e0ebba5)
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/service/deploy/worker/storage/TierWriter.scala | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 29865ba39..4092336f3 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -666,7 +666,14 @@ class DfsTierWriter(
override def notifyFileCommitted(): Unit =
storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo)
- override def closeResource(): Unit = {}
+ override def closeResource(): Unit = {
+ if (s3MultipartUploadHandler != null) {
+ s3MultipartUploadHandler.close()
+ }
+ if (ossMultipartUploadHandler != null) {
+ ossMultipartUploadHandler.close()
+ }
+ }
override def cleanLocalOrDfsFiles(): Unit = {
hdfsFileInfo.deleteAllFiles(hadoopFs)