This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.2 by this push:
new d0957ecd [CELEBORN-228]Refactor PartitionFileSorter to avoid specific
JDK dependency. (#1168)
d0957ecd is described below
commit d0957ecd961a0b5c03e111c25fd6e34efe54b0b2
Author: Ethan Feng <[email protected]>
AuthorDate: Mon Jan 16 20:06:47 2023 +0800
[CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK
dependency. (#1168)
---
.../org/apache/celeborn/common/CelebornConf.scala | 2 +-
docs/configuration/worker.md | 2 +-
.../worker/storage/PartitionFilesSorter.java | 55 ++++++++++++++--------
3 files changed, 38 insertions(+), 21 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ece2fe82..64b28106 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1870,7 +1870,7 @@ object CelebornConf extends Logging {
buildConf("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
.withAlternative("rss.worker.initialReserveSingleSortMemory")
.categories("worker")
- .doc("Initial reserve memory when sorting a shuffle file off-heap.")
+ .doc("Reserved memory when sorting a shuffle file off-heap.")
.version("0.2.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1mb")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 4e0ed09d..8fe87978 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -68,7 +68,7 @@ license: |
| celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory
where linux file block information is stored. | 0.2.0 |
| celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application
shuffle data dir have not been operated during le duration time, will mark this
application as expired. | 0.2.0 |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio
of partition sorter's memory for sorting, when reserved memory is higher than
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
-| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial
reserve memory when sorting a shuffle file off-heap. | 0.2.0 |
+| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved
memory when sorting a shuffle file off-heap. | 0.2.0 |
| celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle
file to sort. | 0.2.0 |
| celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to
handle client push data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.push.port | 0 | Server port for Worker to receive push data
request from ShuffleClient. | 0.2.0 |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 7af454e2..30b2d09f 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -47,7 +47,6 @@ import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
@@ -353,7 +352,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
indexSize += entry.getValue().size() * 16;
}
- ByteBuffer indexBuf = ByteBuffer.allocateDirect(indexSize);
+ ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry :
indexMap.entrySet()) {
int mapId = entry.getKey();
List<ShuffleBlockInfo> list = entry.getValue();
@@ -379,7 +378,6 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
indexFileChannel.close();
}
- ((DirectBuffer) indexBuf).cleaner().clean();
}
protected void readStreamFully(FSDataInputStream stream, ByteBuffer buffer,
String path)
@@ -562,14 +560,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
index += batchHeaderLen + compressedSize;
paddingBuf.clear();
- if (compressedSize > reserveMemory) {
- ((DirectBuffer) paddingBuf).cleaner().clean();
- paddingBuf = expandBufferAndUpdateMemoryTracker(reserveMemory,
compressedSize);
- reserveMemory = compressedSize;
- }
- paddingBuf.limit(compressedSize);
- // TODO: compare skip or read performance differential
- readBufferFully(paddingBuf);
+ readBufferBySize(paddingBuf, compressedSize);
}
long fileIndex = 0;
@@ -590,7 +581,6 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
}
- ((DirectBuffer) paddingBuf).cleaner().clean();
memoryManager.releaseSortMemory(reserveMemory);
writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
@@ -655,14 +645,41 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
}
- private ByteBuffer expandBufferAndUpdateMemoryTracker(int oldCapacity, int
newCapacity)
- throws InterruptedException {
- memoryManager.releaseSortMemory(oldCapacity);
- memoryManager.reserveSortMemory(newCapacity);
- while (!memoryManager.sortMemoryReady()) {
- Thread.sleep(20);
+ protected void readChannelBySize(
+ FileChannel channel, ByteBuffer buffer, String path, int toRead)
throws IOException {
+ int read = 0;
+ if (toRead < buffer.capacity()) {
+ buffer.limit(toRead);
+ }
+ while (read != toRead) {
+ int tmpRead = channel.read(buffer);
+ if (-1 == tmpRead) {
+ throw new IOException(
+ "Unexpected EOF, file name : "
+ + path
+ + " position :"
+ + channel.position()
+ + " read size :"
+ + read);
+ } else {
+ read += tmpRead;
+ if (!buffer.hasRemaining()) {
+ buffer.clear();
+ if (toRead - read < buffer.capacity()) {
+ buffer.limit(toRead - read);
+ }
+ }
+ }
+ }
+ }
+
+ private void readBufferBySize(ByteBuffer buffer, int toRead) throws
IOException {
+ if (isHdfs) {
+ // HDFS don't need warmup
+ hdfsOriginInput.seek(toRead + hdfsOriginInput.getPos());
+ } else {
+ readChannelBySize(originFileChannel, buffer, originFilePath, toRead);
}
- return ByteBuffer.allocateDirect(newCapacity);
}
}
}