This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch CELEBORN-228
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 0aca63a67cb0231ab47e9398bdbf9849c14660c8
Author: Ethan Feng <[email protected]>
AuthorDate: Mon Jan 16 17:25:59 2023 +0800

    [CELEBORN-228]Refactor PartitionFileSorter to avoid specific JDK dependency.
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  2 +-
 docs/configuration/worker.md                       |  2 +-
 .../worker/storage/PartitionFilesSorter.java       | 84 +++++++++++++++++-----
 3 files changed, 67 insertions(+), 21 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 4e45b8f3..9d59f65c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1901,7 +1901,7 @@ object CelebornConf extends Logging {
     buildConf("celeborn.worker.partitionSorter.reservedMemoryPerPartition")
       .withAlternative("rss.worker.initialReserveSingleSortMemory")
       .categories("worker")
-      .doc("Initial reserve memory when sorting a shuffle file off-heap.")
+      .doc("Reserved memory when sorting a shuffle file off-heap.")
       .version("0.2.0")
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("1mb")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8548c44b..d5cb68cc 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -69,7 +69,7 @@ license: |
 | celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory 
where linux file block information is stored. | 0.2.0 | 
 | celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application 
shuffle data dir have not been operated during le duration time, will mark this 
application as expired. | 0.2.0 | 
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio 
of partition sorter's memory for sorting, when reserved memory is higher than 
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
-| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial 
reserve memory when sorting a shuffle file off-heap. | 0.2.0 | 
+| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Reserved 
memory when sorting a shuffle file off-heap. | 0.2.0 | 
 | celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle 
file to sort. | 0.2.0 | 
 | celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to 
handle client push data. The default threads number is 16. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data 
request from ShuffleClient. | 0.2.0 | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 6d3501d2..691f3c9d 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -47,7 +47,6 @@ import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import sun.nio.ch.DirectBuffer;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
@@ -353,7 +352,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       indexSize += entry.getValue().size() * 16;
     }
 
-    ByteBuffer indexBuf = ByteBuffer.allocateDirect(indexSize);
+    ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
     for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry : 
indexMap.entrySet()) {
       int mapId = entry.getKey();
       List<ShuffleBlockInfo> list = entry.getValue();
@@ -379,7 +378,6 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       }
       indexFileChannel.close();
     }
-    ((DirectBuffer) indexBuf).cleaner().clean();
   }
 
   protected void readStreamFully(FSDataInputStream stream, ByteBuffer buffer, 
String path)
@@ -397,6 +395,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     }
   }
 
+
   protected void readChannelFully(FileChannel channel, ByteBuffer buffer, 
String path)
       throws IOException {
     while (buffer.hasRemaining()) {
@@ -563,14 +562,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
           index += batchHeaderLen + compressedSize;
           paddingBuf.clear();
-          if (compressedSize > reserveMemory) {
-            ((DirectBuffer) paddingBuf).cleaner().clean();
-            paddingBuf = expandBufferAndUpdateMemoryTracker(reserveMemory, 
compressedSize);
-            reserveMemory = compressedSize;
-          }
-          paddingBuf.limit(compressedSize);
-          // TODO: compare skip or read performance differential
-          readBufferFully(paddingBuf);
+          readBufferBySize(paddingBuf, compressedSize);
         }
 
         long fileIndex = 0;
@@ -591,7 +583,6 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
         }
 
-        ((DirectBuffer) paddingBuf).cleaner().clean();
         memoryManager.releaseSortMemory(reserveMemory);
 
         writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
@@ -656,14 +647,69 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       }
     }
 
-    private ByteBuffer expandBufferAndUpdateMemoryTracker(int oldCapacity, int 
newCapacity)
-        throws InterruptedException {
-      memoryManager.releaseSortMemory(oldCapacity);
-      memoryManager.reserveSortMemory(newCapacity);
-      while (!memoryManager.sortMemoryReady()) {
-        Thread.sleep(20);
+
+    protected void readStreamBySize(
+            FSDataInputStream stream, ByteBuffer buffer, String path, int 
toRead) throws IOException {
+      int read = 0;
+      if (toRead < buffer.capacity()) {
+        buffer.limit(toRead);
+      }
+      while (read != toRead) {
+        int tmpRead = stream.read(buffer);
+        if (-1 == tmpRead) {
+          throw new IOException(
+                  "Unexpected EOF, file name : "
+                          + path
+                          + " position :"
+                          + stream.getPos()
+                          + " read size :"
+                          + read);
+        } else {
+          read += tmpRead;
+          if (!buffer.hasRemaining()) {
+            buffer.clear();
+            if (toRead - read < buffer.capacity()) {
+              buffer.limit(toRead - read);
+            }
+          }
+        }
       }
-      return ByteBuffer.allocateDirect(newCapacity);
     }
+    protected void readChannelBySize(FileChannel channel, ByteBuffer buffer, 
String path, int toRead)
+            throws IOException {
+      int read = 0;
+      if (toRead < buffer.capacity()) {
+        buffer.limit(toRead);
+      }
+      while (read != toRead) {
+        int tmpRead = channel.read(buffer);
+        if (-1 == tmpRead) {
+          throw new IOException(
+                  "Unexpected EOF, file name : "
+                          + path
+                          + " position :"
+                          + channel.position()
+                          + " read size :"
+                          + read);
+        } else {
+          read += tmpRead;
+          if (!buffer.hasRemaining()) {
+            buffer.clear();
+            if (toRead - read < buffer.capacity()) {
+              buffer.limit(toRead - read);
+            }
+          }
+        }
+      }
+    }
+
+    private void readBufferBySize(ByteBuffer buffer, int toRead) throws 
IOException {
+      if (isHdfs) {
+        readStreamBySize(hdfsOriginInput, buffer, originFilePath, toRead);
+      } else {
+        readChannelBySize(originFileChannel, buffer, originFilePath, toRead);
+      }
+    }
+
   }
 }

Reply via email to