waitinfuture commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1609638932


##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -444,7 +444,10 @@ private PartitionReader createReader(
       switch (storageInfo.getType()) {
         case HDD:
         case SSD:
-          if (enabledReadLocalShuffle && 
location.getHost().equals(localHostAddress)) {
+        case MEMORY:
+          if (enabledReadLocalShuffle
+              && location.getHost().equals(localHostAddress)
+              && location.getStorageInfo().getType() != 
StorageInfo.Type.MEMORY) {

Review Comment:
   Just use `storageInfo.getType()`



##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -53,4 +74,40 @@ public boolean isPartitionSplitEnabled() {
   public void setPartitionSplitEnabled(boolean partitionSplitEnabled) {
     this.partitionSplitEnabled = partitionSplitEnabled;
   }
+
+  boolean isReduceFileMeta() {

Review Comment:
   no need for this method, just use the variable



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -22,12 +22,13 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{FileAlreadyExistsException, Files, Paths}
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, 
ThreadPoolExecutor, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.function.{BiConsumer, IntUnaryOperator}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import java.util.function.{BiConsumer, Consumer, IntUnaryOperator}

Review Comment:
   `Consumer` unused



##########
common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java:
##########
@@ -43,24 +47,49 @@ public static List<Long> 
getChunkOffsetsFromShuffleBlockInfos(
       maxMapIndex = indexMap.keySet().stream().max(Integer::compareTo).get() + 
1;
     }
 
-    for (int i = startMapIndex; i < maxMapIndex; i++) {
-      List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
-      if (blockInfos != null) {
-        for (ShuffleBlockInfo info : blockInfos) {
-          if (sortedChunkOffset.size() == 0) {
-            sortedChunkOffset.add(info.offset);
+    if (isInMemory) {
+      long currentChunkOffset = 0;
+      long lastChunkOffset = 0;
+      // This sorted chunk offsets are used for fetch handler.
+      // Sorted byte buf is a new composite byte buf filled with small buffers.

Review Comment:
   `filled with small buffers` => `containing the required data`



##########
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java:
##########
@@ -39,11 +56,32 @@ public void setBuffer(CompositeByteBuf buffer) {
   }
 
   public void setBufferSize(int bufferSize) {

Review Comment:
   unused method



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -39,7 +52,17 @@ public synchronized List<Long> getChunkOffsets() {
   }
 
   public synchronized void addChunkOffset(long offset) {
-    chunkOffsets.add(offset);
+    nextBoundary = offset + chunkSize;
+    // keep compatible with reduce partition data writer's force update chunk 
offset logic

Review Comment:
   This comment can be deleted



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -146,18 +144,25 @@ public long registerStream(long streamId, String 
shuffleKey, String fileName) {
    * stream is not properly closed, it will eventually be cleaned up by 
`cleanupExpiredShuffleKey`.
    */
   public long registerStream(
-      String shuffleKey, FileManagedBuffers buffers, String fileName, 
TimeWindow fetchTimeMetric) {
+      String shuffleKey, ChunkBuffers buffers, String fileName, TimeWindow 
fetchTimeMetric) {
     long myStreamId = nextStreamId.getAndIncrement();
-    return registerStream(myStreamId, shuffleKey, buffers, fileName, 
fetchTimeMetric);
+    return registerStream(myStreamId, shuffleKey, buffers, fileName, 
fetchTimeMetric, null);
   }
 
   public long registerStream(
       long streamId,
       String shuffleKey,
-      FileManagedBuffers buffers,
+      ChunkBuffers buffers,
       String fileName,
-      TimeWindow fetchTimeMetric) {
-    streams.put(streamId, new StreamState(shuffleKey, buffers, fileName, 
fetchTimeMetric));
+      TimeWindow fetchTimeMetric,
+      FileInfo fileInfo) {
+    StreamState streamState = null;
+    if (fileInfo != null && fileInfo instanceof MemoryFileInfo) {

Review Comment:
   identical if-else branches. No need for `fileInfo`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,23 +194,54 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
-    synchronized (flushLock) {
-      // flushBuffer == null here means writer already closed
-      if (flushBuffer != null) {
-        int numBytes = flushBuffer.readableBytes();
-        if (numBytes != 0) {
-          notifier.checkException();
+  @VisibleForTesting
+  public void flush(boolean finalFlush, boolean fromEvict) throws IOException {
+    // flushBuffer == null here means this writer is already closed
+    if (flushBuffer != null) {
+      int numBytes = flushBuffer.readableBytes();
+      if (numBytes != 0) {
+        notifier.checkException();
+        FlushTask task = null;
+        if (fromEvict) {
           notifier.numPendingFlushes.incrementAndGet();
-          FlushTask task = null;
+          // duplicate buffer before its released
+          ByteBuf dupBuf = flushBuffer.retainedDuplicate();
+          // flush task will release the buffer of memory shuffle file
           if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
+            task = new LocalFlushTask(flushBuffer, channel, notifier, false);
           } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier, false);
           }
+          MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+          MemoryManager.instance().incrementDiskBuffer(numBytes);
+          // read flush buffer to generate correct chunk offsets
+          // data header layout (mapId, attemptId, nextBatchId, length)
+          ByteBuffer headerBuf = ByteBuffer.allocate(16);
+          while (dupBuf.isReadable()) {
+            headerBuf.rewind();
+            dupBuf.readBytes(headerBuf);
+            byte[] batchHeader = headerBuf.array();
+            int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+            dupBuf.skipBytes(compressedSize);
+            diskFileInfo.updateBytesFlushed(compressedSize + 16);
+          }
+          dupBuf.release();
+        } else {
+          if (!isMemoryShuffleFile.get()) {
+            notifier.numPendingFlushes.incrementAndGet();
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, true);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, true);
+            }
+          }
+        }
+        if (task != null) {

Review Comment:
   Use Precondition.check to make sure task is not null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to