This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.2 by this push:
     new d0957ecd [CELEBORN-228]Refactor PartitionFileSorter to avoid specific 
JDK dependency. (#1168)
d0957ecd is described below

commit d0957ecd961a0b5c03e111c25fd6e34efe54b0b2
Author: Ethan Feng <[email protected]>
AuthorDate: Mon Jan 16 20:06:47 2023 +0800

    [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK 
dependency. (#1168)
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  2 +-
 docs/configuration/worker.md                       |  2 +-
 .../worker/storage/PartitionFilesSorter.java       | 55 ++++++++++++++--------
 3 files changed, 38 insertions(+), 21 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ece2fe82..64b28106 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1870,7 +1870,7 @@ object CelebornConf extends Logging {
     buildConf("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
       .withAlternative("rss.worker.initialReserveSingleSortMemory")
       .categories("worker")
-      .doc("Initial reserve memory when sorting a shuffle file off-heap.")
+      .doc("Reserved memory when sorting a shuffle file off-heap.")
       .version("0.2.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1mb")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 4e0ed09d..8fe87978 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -68,7 +68,7 @@ license: |
 | celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory 
where linux file block information is stored. | 0.2.0 | 
 | celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application 
shuffle data dir have not been operated during le duration time, will mark this 
application as expired. | 0.2.0 | 
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio 
of partition sorter's memory for sorting, when reserved memory is higher than 
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
-| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial 
reserve memory when sorting a shuffle file off-heap. | 0.2.0 | 
+| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved 
memory when sorting a shuffle file off-heap. | 0.2.0 | 
 | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle 
file to sort. | 0.2.0 | 
 | celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to 
handle client push data. The default threads number is 16. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data 
request from ShuffleClient. | 0.2.0 | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 7af454e2..30b2d09f 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -47,7 +47,6 @@ import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
@@ -353,7 +352,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       indexSize += entry.getValue().size() * 16;
     }
 
-    ByteBuffer indexBuf = ByteBuffer.allocateDirect(indexSize);
+    ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
     for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry : 
indexMap.entrySet()) {
       int mapId = entry.getKey();
       List<ShuffleBlockInfo> list = entry.getValue();
@@ -379,7 +378,6 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       }
       indexFileChannel.close();
     }
-    ((DirectBuffer) indexBuf).cleaner().clean();
   }
 
   protected void readStreamFully(FSDataInputStream stream, ByteBuffer buffer, 
String path)
@@ -562,14 +560,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
           index += batchHeaderLen + compressedSize;
           paddingBuf.clear();
-          if (compressedSize > reserveMemory) {
-            ((DirectBuffer) paddingBuf).cleaner().clean();
-            paddingBuf = expandBufferAndUpdateMemoryTracker(reserveMemory, 
compressedSize);
-            reserveMemory = compressedSize;
-          }
-          paddingBuf.limit(compressedSize);
-          // TODO: compare skip or read performance differential
-          readBufferFully(paddingBuf);
+          readBufferBySize(paddingBuf, compressedSize);
         }
 
         long fileIndex = 0;
@@ -590,7 +581,6 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
         }
 
-        ((DirectBuffer) paddingBuf).cleaner().clean();
         memoryManager.releaseSortMemory(reserveMemory);
 
         writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
@@ -655,14 +645,41 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       }
     }
 
-    private ByteBuffer expandBufferAndUpdateMemoryTracker(int oldCapacity, int 
newCapacity)
-        throws InterruptedException {
-      memoryManager.releaseSortMemory(oldCapacity);
-      memoryManager.reserveSortMemory(newCapacity);
-      while (!memoryManager.sortMemoryReady()) {
-        Thread.sleep(20);
+    protected void readChannelBySize(
+        FileChannel channel, ByteBuffer buffer, String path, int toRead) 
throws IOException {
+      int read = 0;
+      if (toRead < buffer.capacity()) {
+        buffer.limit(toRead);
+      }
+      while (read != toRead) {
+        int tmpRead = channel.read(buffer);
+        if (-1 == tmpRead) {
+          throw new IOException(
+              "Unexpected EOF, file name : "
+                  + path
+                  + " position :"
+                  + channel.position()
+                  + " read size :"
+                  + read);
+        } else {
+          read += tmpRead;
+          if (!buffer.hasRemaining()) {
+            buffer.clear();
+            if (toRead - read < buffer.capacity()) {
+              buffer.limit(toRead - read);
+            }
+          }
+        }
+      }
+    }
+
+    private void readBufferBySize(ByteBuffer buffer, int toRead) throws 
IOException {
+      if (isHdfs) {
+        // HDFS don't need warmup
+        hdfsOriginInput.seek(toRead + hdfsOriginInput.getPos());
+      } else {
+        readChannelBySize(originFileChannel, buffer, originFilePath, toRead);
       }
-      return ByteBuffer.allocateDirect(newCapacity);
     }
   }
 }

Reply via email to