This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new da323ef69 [CELEBORN-2047][FOLLOWUP] MapPartitionData should close
dataChannel, indexChannel, dataInputStream and indexInputStream
da323ef69 is described below
commit da323ef69b0fc0ade5dea234a3831679ae354e57
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 5 14:31:17 2025 +0800
[CELEBORN-2047][FOLLOWUP] MapPartitionData should close dataChannel,
indexChannel, dataInputStream and indexInputStream
### What changes were proposed in this pull request?
`MapPartitionData` should close `dataChannel`, `indexChannel`,
`dataInputStream` and `indexInputStream`.
Follow up #3445.
### Why are the changes needed?
`dataChannel`, `indexChannel`, `dataInputStream` and `indexInputStream`
should be closed in `MapPartitionData` instead of `PartitionDataReader`.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3521 from SteNicholas/CELEBORN-2047.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: 子懿 <[email protected]>
---
.../worker/storage/DfsPartitionDataReader.java | 7 -------
.../worker/storage/LocalPartitionDataReader.java | 7 -------
.../deploy/worker/storage/MapPartitionData.java | 22 ++++++++++++----------
.../worker/storage/MapPartitionDataReader.java | 4 ----
.../deploy/worker/storage/PartitionDataReader.java | 2 --
5 files changed, 12 insertions(+), 30 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
index 0e426270d..cc9716a7a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -93,12 +92,6 @@ public class DfsPartitionDataReader extends
PartitionDataReader {
return dataInputStream.getPos();
}
- @Override
- public void close() {
- IOUtils.closeQuietly(dataInputStream);
- IOUtils.closeQuietly(indexInputStream);
- }
-
private void readHeaderOrIndexBuffer(
FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int
length, String filePath)
throws IOException {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
index 8cc356e4b..6777c525f 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import io.netty.buffer.ByteBuf;
-import org.apache.commons.io.IOUtils;
import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.util.Utils;
@@ -85,12 +84,6 @@ public class LocalPartitionDataReader extends
PartitionDataReader {
return dataFileChanel.position();
}
- @Override
- public void close() {
- IOUtils.closeQuietly(dataFileChanel);
- IOUtils.closeQuietly(indexFileChannel);
- }
-
private void readHeaderOrIndexBuffer(
FileChannel channel, ByteBuffer buffer, long fileSize, int length,
String filePath)
throws IOException {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index fc0522fe5..4eff011ab 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -28,6 +28,7 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -50,10 +51,10 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
protected final ExecutorService readExecutor;
protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers =
JavaUtils.newConcurrentHashMap();
- private FileChannel dataFileChanel;
+ private FileChannel dataChannel;
private FileChannel indexChannel;
- private FSDataInputStream hdfsDataInputStream;
- private FSDataInputStream hdfsIndexInputStream;
+ private FSDataInputStream dataInputStream;
+ private FSDataInputStream indexInputStream;
private volatile boolean isReleased = false;
private final BufferQueue bufferQueue = new BufferQueue();
private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@@ -99,17 +100,16 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
false));
if (diskFileInfo.isDFS()) {
- this.hdfsDataInputStream =
+ this.dataInputStream =
StorageManager.hadoopFs()
.get(diskFileInfo.getStorageType())
.open(new Path(diskFileInfo.getFilePath()));
- this.hdfsIndexInputStream =
+ this.indexInputStream =
StorageManager.hadoopFs()
.get(diskFileInfo.getStorageType())
.open(new Path(diskFileInfo.getIndexPath()));
-
} else {
- this.dataFileChanel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
+ this.dataChannel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
this.indexChannel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
}
MemoryManager.instance().addReadBufferTargetChangeListener(this);
@@ -187,7 +187,7 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
}
protected void openReader(MapPartitionDataReader reader) throws IOException {
- reader.open(dataFileChanel, indexChannel, hdfsDataInputStream,
hdfsIndexInputStream);
+ reader.open(dataChannel, indexChannel, dataInputStream, indexInputStream);
}
public synchronized void readBuffers() {
@@ -265,8 +265,10 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
bufferQueue.release();
isReleased = true;
- readers.values().forEach(MapPartitionDataReader::close);
- readers.clear();
+ IOUtils.closeQuietly(dataChannel);
+ IOUtils.closeQuietly(indexChannel);
+ IOUtils.closeQuietly(dataInputStream);
+ IOUtils.closeQuietly(indexInputStream);
MemoryManager.instance().removeReadBufferTargetChangeListener(this);
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 0f3874027..1db2f81b4 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -516,8 +516,4 @@ public class MapPartitionDataReader implements
Comparable<MapPartitionDataReader
return !isReleased && !readFinished;
}
}
-
- public void close() {
- partitionDataReader.close();
- }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
index 48b08d741..f1c0276a1 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
@@ -56,8 +56,6 @@ public abstract class PartitionDataReader {
public abstract long position() throws IOException;
- public abstract void close();
-
public int readBuffer(ByteBuf buffer, long dataConsumingOffset) throws
IOException {
position(dataConsumingOffset);
int headerSize = headerBuffer.capacity();