This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new da323ef69 [CELEBORN-2047][FOLLOWUP] MapPartitionData should close 
dataChannel, indexChannel, dataInputStream and indexInputStream
da323ef69 is described below

commit da323ef69b0fc0ade5dea234a3831679ae354e57
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 5 14:31:17 2025 +0800

    [CELEBORN-2047][FOLLOWUP] MapPartitionData should close dataChannel, 
indexChannel, dataInputStream and indexInputStream
    
    ### What changes were proposed in this pull request?
    
    `MapPartitionData` should close `dataChannel`, `indexChannel`, 
`dataInputStream` and `indexInputStream`.
    
    Follow up #3445.
    
    ### Why are the changes needed?
    
    `dataChannel`, `indexChannel`, `dataInputStream` and `indexInputStream` 
should be closed in `MapPartitionData` instead of `PartitionDataReader`.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3521 from SteNicholas/CELEBORN-2047.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: 子懿 <[email protected]>
---
 .../worker/storage/DfsPartitionDataReader.java     |  7 -------
 .../worker/storage/LocalPartitionDataReader.java   |  7 -------
 .../deploy/worker/storage/MapPartitionData.java    | 22 ++++++++++++----------
 .../worker/storage/MapPartitionDataReader.java     |  4 ----
 .../deploy/worker/storage/PartitionDataReader.java |  2 --
 5 files changed, 12 insertions(+), 30 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
index 0e426270d..cc9716a7a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -93,12 +92,6 @@ public class DfsPartitionDataReader extends 
PartitionDataReader {
     return dataInputStream.getPos();
   }
 
-  @Override
-  public void close() {
-    IOUtils.closeQuietly(dataInputStream);
-    IOUtils.closeQuietly(indexInputStream);
-  }
-
   private void readHeaderOrIndexBuffer(
       FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int 
length, String filePath)
       throws IOException {
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
index 8cc356e4b..6777c525f 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.commons.io.IOUtils;
 
 import org.apache.celeborn.common.meta.DiskFileInfo;
 import org.apache.celeborn.common.util.Utils;
@@ -85,12 +84,6 @@ public class LocalPartitionDataReader extends 
PartitionDataReader {
     return dataFileChanel.position();
   }
 
-  @Override
-  public void close() {
-    IOUtils.closeQuietly(dataFileChanel);
-    IOUtils.closeQuietly(indexFileChannel);
-  }
-
   private void readHeaderOrIndexBuffer(
       FileChannel channel, ByteBuffer buffer, long fileSize, int length, 
String filePath)
       throws IOException {
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index fc0522fe5..4eff011ab 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -28,6 +28,7 @@ import java.util.function.Consumer;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -50,10 +51,10 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
   protected final ExecutorService readExecutor;
   protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers =
       JavaUtils.newConcurrentHashMap();
-  private FileChannel dataFileChanel;
+  private FileChannel dataChannel;
   private FileChannel indexChannel;
-  private FSDataInputStream hdfsDataInputStream;
-  private FSDataInputStream hdfsIndexInputStream;
+  private FSDataInputStream dataInputStream;
+  private FSDataInputStream indexInputStream;
   private volatile boolean isReleased = false;
   private final BufferQueue bufferQueue = new BufferQueue();
   private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@@ -99,17 +100,16 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
                     false));
 
     if (diskFileInfo.isDFS()) {
-      this.hdfsDataInputStream =
+      this.dataInputStream =
           StorageManager.hadoopFs()
               .get(diskFileInfo.getStorageType())
               .open(new Path(diskFileInfo.getFilePath()));
-      this.hdfsIndexInputStream =
+      this.indexInputStream =
           StorageManager.hadoopFs()
               .get(diskFileInfo.getStorageType())
               .open(new Path(diskFileInfo.getIndexPath()));
-
     } else {
-      this.dataFileChanel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
+      this.dataChannel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
       this.indexChannel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
     }
     MemoryManager.instance().addReadBufferTargetChangeListener(this);
@@ -187,7 +187,7 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
   }
 
   protected void openReader(MapPartitionDataReader reader) throws IOException {
-    reader.open(dataFileChanel, indexChannel, hdfsDataInputStream, 
hdfsIndexInputStream);
+    reader.open(dataChannel, indexChannel, dataInputStream, indexInputStream);
   }
 
   public synchronized void readBuffers() {
@@ -265,8 +265,10 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
     bufferQueue.release();
     isReleased = true;
 
-    readers.values().forEach(MapPartitionDataReader::close);
-    readers.clear();
+    IOUtils.closeQuietly(dataChannel);
+    IOUtils.closeQuietly(indexChannel);
+    IOUtils.closeQuietly(dataInputStream);
+    IOUtils.closeQuietly(indexInputStream);
 
     MemoryManager.instance().removeReadBufferTargetChangeListener(this);
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 0f3874027..1db2f81b4 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -516,8 +516,4 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       return !isReleased && !readFinished;
     }
   }
-
-  public void close() {
-    partitionDataReader.close();
-  }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
index 48b08d741..f1c0276a1 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
@@ -56,8 +56,6 @@ public abstract class PartitionDataReader {
 
   public abstract long position() throws IOException;
 
-  public abstract void close();
-
   public int readBuffer(ByteBuf buffer, long dataConsumingOffset) throws 
IOException {
     position(dataConsumingOffset);
     int headerSize = headerBuffer.capacity();

Reply via email to