This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch CELEBORN-228 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 0aca63a67cb0231ab47e9398bdbf9849c14660c8 Author: Ethan Feng <[email protected]> AuthorDate: Mon Jan 16 17:25:59 2023 +0800 [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK dependency. --- .../org/apache/celeborn/common/CelebornConf.scala | 2 +- docs/configuration/worker.md | 2 +- .../worker/storage/PartitionFilesSorter.java | 84 +++++++++++++++++----- 3 files changed, 67 insertions(+), 21 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 4e45b8f3..9d59f65c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1901,7 +1901,7 @@ object CelebornConf extends Logging { buildConf("celeborn.worker.partitionSorter.reservedMemoryPerPartition") .withAlternative("rss.worker.initialReserveSingleSortMemory") .categories("worker") - .doc("Initial reserve memory when sorting a shuffle file off-heap.") + .doc("Reserved memory when sorting a shuffle file off-heap.") .version("0.2.0") .bytesConf(ByteUnit.BYTE) .createWithDefaultString("1mb") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 8548c44b..d5cb68cc 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -69,7 +69,7 @@ license: | | celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory where linux file block information is stored. | 0.2.0 | | celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application shuffle data dir have not been operated during le duration time, will mark this application as expired. | 0.2.0 | | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | -| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial reserve memory when sorting a shuffle file off-heap. | 0.2.0 | +| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved memory when sorting a shuffle file off-heap. | 0.2.0 | | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle file to sort. | 0.2.0 | | celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to handle client push data. The default threads number is 16. | 0.2.0 | | celeborn.worker.push.port | 0 | Server port for Worker to receive push data request from ShuffleClient. | 0.2.0 | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 6d3501d2..691f3c9d 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -47,7 +47,6 @@ import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.nio.ch.DirectBuffer; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; @@ -353,7 +352,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { indexSize += entry.getValue().size() * 16; } - ByteBuffer indexBuf = ByteBuffer.allocateDirect(indexSize); + ByteBuffer indexBuf = ByteBuffer.allocate(indexSize); for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry : indexMap.entrySet()) { int mapId = entry.getKey(); List<ShuffleBlockInfo> list = entry.getValue(); @@ -379,7 +378,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { } indexFileChannel.close(); } - ((DirectBuffer) indexBuf).cleaner().clean(); } protected void readStreamFully(FSDataInputStream stream, ByteBuffer buffer, String path) @@ -397,6 +395,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { } } + protected void readChannelFully(FileChannel channel, ByteBuffer buffer, String path) throws IOException { while (buffer.hasRemaining()) { @@ -563,14 +562,7 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { index += batchHeaderLen + compressedSize; paddingBuf.clear(); - if (compressedSize > reserveMemory) { - ((DirectBuffer) paddingBuf).cleaner().clean(); - paddingBuf = expandBufferAndUpdateMemoryTracker(reserveMemory, compressedSize); - reserveMemory = compressedSize; - } - paddingBuf.limit(compressedSize); - // TODO: compare skip or read performance differential - readBufferFully(paddingBuf); + readBufferBySize(paddingBuf, compressedSize); } long fileIndex = 0; @@ -591,7 +583,6 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { sortedBlockInfoMap.put(mapId, sortedShuffleBlocks); } - ((DirectBuffer) paddingBuf).cleaner().clean(); memoryManager.releaseSortMemory(reserveMemory); writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs); @@ -656,14 +647,69 @@ public class PartitionFilesSorter extends ShuffleRecoverHelper { } } - private ByteBuffer expandBufferAndUpdateMemoryTracker(int oldCapacity, int newCapacity) - throws InterruptedException { - memoryManager.releaseSortMemory(oldCapacity); - memoryManager.reserveSortMemory(newCapacity); - while (!memoryManager.sortMemoryReady()) { - Thread.sleep(20); + + protected void readStreamBySize( + FSDataInputStream stream, ByteBuffer buffer, String path, int toRead) throws IOException { + int read = 0; + if (toRead < buffer.capacity()) { + buffer.limit(toRead); + } + while (read != toRead) { + int tmpRead = stream.read(buffer); + if (-1 == tmpRead) { + throw new IOException( + "Unexpected EOF, file name : " + + path + + " position :" + + stream.getPos() + + " read size :" + + read); + } else { + read += tmpRead; + if (!buffer.hasRemaining()) { + buffer.clear(); + if (toRead - read < buffer.capacity()) { + buffer.limit(toRead - read); + } + } + } } - return ByteBuffer.allocateDirect(newCapacity); } + protected void readChannelBySize(FileChannel channel, ByteBuffer buffer, String path, int toRead) + throws IOException { + int read = 0; + if (toRead < buffer.capacity()) { + buffer.limit(toRead); + } + while (read != toRead) { + int tmpRead = channel.read(buffer); + if (-1 == tmpRead) { + throw new IOException( + "Unexpected EOF, file name : " + + path + + " position :" + + channel.position() + + " read size :" + + read); + } else { + read += tmpRead; + if (!buffer.hasRemaining()) { + buffer.clear(); + if (toRead - read < buffer.capacity()) { + buffer.limit(toRead - read); + } + } + } + } + } + + private void readBufferBySize(ByteBuffer buffer, int toRead) throws IOException { + if (isHdfs) { + readStreamBySize(hdfsOriginInput, buffer, originFilePath, toRead); + } else { + readChannelBySize(originFileChannel, buffer, originFilePath, toRead); + } + } + } }
