waitinfuture commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1590491442


##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -53,4 +73,56 @@ public boolean isPartitionSplitEnabled() {
   public void setPartitionSplitEnabled(boolean partitionSplitEnabled) {
     this.partitionSplitEnabled = partitionSplitEnabled;
   }
+
+  private boolean isReduceFileMeta() {
+    return fileMeta instanceof ReduceFileMeta;
+  }
+
+  public boolean addStream(long streamId) {
+    if (!isReduceFileMeta()) {
+      throw new IllegalStateException("In addStream, filemeta cannot be 
MapFileMeta");
+    }
+    ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta;
+    synchronized (reduceFileMeta.getSorted()) {
+      if (reduceFileMeta.getSorted().get()) {
+        return false;
+      } else {
+        streams.add(streamId);
+        return true;
+      }
+    }
+  }
+
+  public void closeStream(long streamId, int startIndex, int endIndex) {
+    if (!isReduceFileMeta()) {
+      throw new IllegalStateException("In closeStream, filemeta cannot be 
MapFileMeta");
+    }
+    ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta;
+    synchronized (reduceFileMeta.getSorted()) {
+      streams.remove(streamId);
+    }
+  }
+
+  public boolean isStreamsEmpty() {
+    if (!isReduceFileMeta()) {
+      throw new IllegalStateException("In isStreamsEmpty, filemeta cannot be 
MapFileMeta");
+    }
+    ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta;
+    synchronized (reduceFileMeta.getSorted()) {
+      return streams.isEmpty();
+    }
+  }
+
+  public boolean isFullyRead() {

Review Comment:
   What if the partition is read again for fallover reason?



##########
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java:
##########
@@ -39,11 +59,62 @@ public void setBuffer(CompositeByteBuf buffer) {
   }
 
   public void setBufferSize(int bufferSize) {
-    this.length = bufferSize;
+    this.bytesFlushed = bufferSize;
+  }
+
+  public CompositeByteBuf getSortedBuffer() {
+    return sortedBuffer;
+  }
+
+  public void setSortedBuffer(CompositeByteBuf sortedBuffer) {
+    this.sortedBuffer = sortedBuffer;
+  }
+
+  public Map<Integer, List<ShuffleBlockInfo>> getSortedIndexes() {
+    return sortedIndexes;
+  }
+
+  public void setSortedIndexes(Map<Integer, List<ShuffleBlockInfo>> 
sortedIndexes) {
+    this.sortedIndexes = sortedIndexes;
+  }
+
+  public int expireMemoryBuffers() {
+    int bufferSize = 0;
+    if (buffer != null) {
+      bufferSize = buffer.writerIndex();
+      buffer.release();
+    }
+    logger.info("Memory File Info {} expire, removed {}", this, bufferSize);
+    return bufferSize;
+  }
+
+  public void incrementReaderCount() {
+    readerCount.incrementAndGet();
+  }
+
+  public void decrementReaderCount() {
+    if (readerCount.get() > 0) {

Review Comment:
   Is it guaranteed that no concurrent invocation? The check won't help if 
there are concurrent invocation.



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -63,4 +87,25 @@ public void setSorted() {
   public AtomicBoolean getSorted() {
     return sorted;
   }
+
+  public void setMapIds(RoaringBitmap mapIds) {
+    this.mapIds = mapIds;
+  }
+
+  public void removeMapIds(int startIndex, int endIndex) {
+    if (mapIds == null) {
+      return;
+    }
+    for (int i = startIndex; i < endIndex; i++) {
+      mapIds.remove(i);
+    }
+  }
+
+  public RoaringBitmap getMapIds() {
+    return mapIds;
+  }
+
+  public long getChunkSize() {

Review Comment:
   unused method



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -21,13 +21,29 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.celeborn.common.CelebornConf;
+
 public class ReduceFileMeta implements FileMeta {
-  private final List<Long> chunkOffsets;
   private final AtomicBoolean sorted = new AtomicBoolean(false);
+  private final List<Long> chunkOffsets;
+  private RoaringBitmap mapIds;
+  private transient CelebornConf conf = new CelebornConf();
+  private long chunkSize = conf.shuffleChunkSize();
+  private long nextBoundary;
 
-  public ReduceFileMeta() {
+  public ReduceFileMeta(long chunkSize) {
     this.chunkOffsets = new ArrayList<>();
     chunkOffsets.add(0L);
+    this.chunkSize = chunkSize;
+    nextBoundary = chunkSize;
+  }
+
+  public ReduceFileMeta(List<Long> chunkOffsets, long chunkSize) {
+    this.chunkOffsets = chunkOffsets;
+    nextBoundary = chunkSize;

Review Comment:
   Seems `nextBoundary` should be last offset plus chunkSize?



##########
worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java:
##########
@@ -0,0 +1,870 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.memory;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import scala.Function0;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.meta.ReduceFileMeta;
+import org.apache.celeborn.common.network.TransportContext;
+import org.apache.celeborn.common.network.buffer.ManagedBuffer;
+import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
+import org.apache.celeborn.common.network.client.TransportClient;
+import org.apache.celeborn.common.network.client.TransportClientFactory;
+import org.apache.celeborn.common.network.protocol.TransportMessage;
+import org.apache.celeborn.common.network.server.TransportServer;
+import org.apache.celeborn.common.network.util.NettyUtils;
+import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.protocol.*;
+import org.apache.celeborn.common.unsafe.Platform;
+import org.apache.celeborn.common.util.ThreadUtils;
+import org.apache.celeborn.service.deploy.worker.FetchHandler;
+import org.apache.celeborn.service.deploy.worker.WorkerSource;
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
+import org.apache.celeborn.service.deploy.worker.storage.*;
+
+public class MemoryReducePartitionDataWriterSuiteJ {

Review Comment:
   I think it's also very necessary to add integrated tests for this feature, 
especially skewed tests(map range read) and  exchange reused test.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -104,20 +114,20 @@ public static MemoryManager instance() {
     return _INSTANCE;
   }
 
-  private MemoryManager(CelebornConf conf) {
+  private MemoryManager(CelebornConf conf, StorageManager storageManager) {
     double pausePushDataRatio = conf.workerDirectMemoryRatioToPauseReceive();
     double pauseReplicateRatio = 
conf.workerDirectMemoryRatioToPauseReplicate();
     double resumeRatio = conf.workerDirectMemoryRatioToResume();
     double maxSortMemRatio = 
conf.workerPartitionSorterDirectMemoryRatioThreshold();
     double readBufferRatio = conf.workerDirectMemoryRatioForReadBuffer();
-    double shuffleStorageRatio = 
conf.workerDirectMemoryRatioForShuffleStorage();
+    double memoryFileStorageRatio = 
conf.workerDirectMemoryRatioForMemoryFilesStorage();

Review Comment:
   Just `this.resumeRatio = xxx`



##########
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java:
##########
@@ -39,11 +59,62 @@ public void setBuffer(CompositeByteBuf buffer) {
   }
 
   public void setBufferSize(int bufferSize) {
-    this.length = bufferSize;
+    this.bytesFlushed = bufferSize;
+  }
+
+  public CompositeByteBuf getSortedBuffer() {
+    return sortedBuffer;
+  }
+
+  public void setSortedBuffer(CompositeByteBuf sortedBuffer) {
+    this.sortedBuffer = sortedBuffer;
+  }
+
+  public Map<Integer, List<ShuffleBlockInfo>> getSortedIndexes() {
+    return sortedIndexes;
+  }
+
+  public void setSortedIndexes(Map<Integer, List<ShuffleBlockInfo>> 
sortedIndexes) {
+    this.sortedIndexes = sortedIndexes;
+  }
+
+  public int expireMemoryBuffers() {
+    int bufferSize = 0;
+    if (buffer != null) {
+      bufferSize = buffer.writerIndex();
+      buffer.release();
+    }
+    logger.info("Memory File Info {} expire, removed {}", this, bufferSize);
+    return bufferSize;
+  }
+
+  public void incrementReaderCount() {
+    readerCount.incrementAndGet();
+  }
+
+  public void decrementReaderCount() {
+    if (readerCount.get() > 0) {
+      readerCount.decrementAndGet();
+    }
+  }
+
+  public boolean hasReader() {
+    return readerCount.get() > 0;
+  }
+
+  public boolean getEvicted() {

Review Comment:
   nit: `isEvicted` is better



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -208,19 +224,91 @@ private MemoryManager(CelebornConf conf) {
           TimeUnit.MILLISECONDS);
     }
 
+    this.storageManager = storageManager;
+    if (memoryFileStorageThreshold > 0
+        && storageManager != null
+        && storageManager.localOrHdfsStorageAvailable()) {
+      ScheduledExecutorService memoryFileStorageService =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
+      memoryFileStorageService.scheduleWithFixedDelay(
+          () -> {
+            try {
+              if ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                  || currentServingState() != ServingState.NONE_PAUSED) {
+                List<PartitionDataWriter> committedWriters = new ArrayList<>();
+                List<PartitionDataWriter> unCommittedWriters = new 
ArrayList<>();
+                synchronized (storageManager.memoryFileInfos()) {

Review Comment:
   What is this synchronize protect from? Seems `memoryFileInfos` is not locked 
elsewhere, and `memoryFileStorageService` is single threaded.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -54,69 +62,110 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   private static final Logger logger = 
LoggerFactory.getLogger(PartitionDataWriter.class);
   private static final long WAIT_INTERVAL_MS = 5;
 
-  protected final DiskFileInfo diskFileInfo;
+  // After commit file, there will be only 1 fileinfo left.
+  protected DiskFileInfo diskFileInfo = null;
+  protected MemoryFileInfo memoryFileInfo = null;
   private FileChannel channel;
   private volatile boolean closed;
   private volatile boolean destroyed;
 
   protected final AtomicInteger numPendingWrites = new AtomicInteger();
 
-  public final Flusher flusher;
-  private final int flushWorkerIndex;
+  public Flusher flusher;
+  private int flushWorkerIndex;
 
   @GuardedBy("flushLock")
-  private CompositeByteBuf flushBuffer;
+  protected CompositeByteBuf flushBuffer;
 
-  private final Object flushLock = new Object();
+  protected final Object flushLock = new Object();
   private final long writerCloseTimeoutMs;
 
-  protected final long flusherBufferSize;
+  protected long flusherBufferSize;
 
   protected final DeviceMonitor deviceMonitor;
   protected final AbstractSource source; // metrics
 
-  private long splitThreshold = 0;
+  private final long splitThreshold;
   private final PartitionSplitMode splitMode;
-  private final PartitionType partitionType;
   private final boolean rangeReadFilter;
   protected boolean deleted = false;
   private RoaringBitmap mapIdBitMap = null;
   protected final FlushNotifier notifier = new FlushNotifier();
   // It's only needed when graceful shutdown is enabled
-  private String shuffleKey;
-  private final StorageManager storageManager;
+  private final String shuffleKey;
+  protected final StorageManager storageManager;
   private final boolean workerGracefulShutdown;
+  protected final long memoryFileStorageMaxFileSize;
+  protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean();
+  protected final String filename;
+  protected PooledByteBufAllocator pooledByteBufAllocator;
+  private final int workerPushMaxComponents;
+  private final PartitionDataWriterContext writerContext;
+  private final long localFlusherBufferSize;
+  private final long hdfsFlusherBufferSize;
+  private Exception exception = null;
+  private boolean metricsCollectCriticalEnabled;
 
   public PartitionDataWriter(
       StorageManager storageManager,
-      DiskFileInfo diskFileInfo,
-      Flusher flusher,
       AbstractSource workerSource,
       CelebornConf conf,
       DeviceMonitor deviceMonitor,
-      long splitThreshold,
-      PartitionSplitMode splitMode,
-      PartitionType partitionType,
-      boolean rangeReadFilter,
-      String shuffleKey)
+      PartitionDataWriterContext writerContext,
+      boolean supportInMemory)
       throws IOException {
     this.storageManager = storageManager;
-    this.diskFileInfo = diskFileInfo;
-    this.flusher = flusher;
-    this.flushWorkerIndex = flusher.getWorkerIndex();
     this.writerCloseTimeoutMs = conf.workerWriterCloseTimeoutMs();
     this.workerGracefulShutdown = conf.workerGracefulShutdown();
-    this.splitThreshold = splitThreshold;
+    this.splitThreshold = writerContext.getSplitThreshold();
     this.deviceMonitor = deviceMonitor;
-    this.splitMode = splitMode;
-    this.partitionType = partitionType;
-    this.rangeReadFilter = rangeReadFilter;
-    this.shuffleKey = shuffleKey;
+    this.splitMode = writerContext.getPartitionSplitMode();
+    this.rangeReadFilter = writerContext.isRangeReadFilter();
+    this.shuffleKey = writerContext.getShuffleKey();
+    this.memoryFileStorageMaxFileSize = 
conf.workerMemoryFileStorageMaxFileSize();
+    this.filename = writerContext.getPartitionLocation().getFileName();
+    this.workerPushMaxComponents = conf.workerPushMaxComponents();
+    this.writerContext = writerContext;
+    this.localFlusherBufferSize = conf.workerFlusherBufferSize();
+    this.hdfsFlusherBufferSize = conf.workerHdfsFlusherBufferSize();
+    this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
+
+    Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =

Review Comment:
   This method can be deleted then.



##########
docs/configuration/worker.md:
##########
@@ -98,6 +98,7 @@ license: |
 | celeborn.worker.jvmQuake.exitCode | 502 | false | The exit code of system 
kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.kill.threshold | 60s | false | The threshold of 
system kill for the maximum GC 'deficit' which can be accumulated before 
jvmquake takes action. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.runtimeWeight | 5.0 | false | The factor by which 
to multiply running JVM time, when weighing it against GCing time. 'Deficit' is 
accumulated as `gc_time - runtime * runtime_weight`, and is compared against 
threshold to determine whether to take action. | 0.4.0 |  | 
+| celeborn.worker.memoryFileStorage.maxFileSize | 8MB | false | Max size for a 
memory storage file. It must be lesser than 2GB. | 0.5.0 |  | 

Review Comment:
   Shall we check to make sure this config does not exceed 2G?



##########
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java:
##########
@@ -39,11 +59,62 @@ public void setBuffer(CompositeByteBuf buffer) {
   }
 
   public void setBufferSize(int bufferSize) {
-    this.length = bufferSize;
+    this.bytesFlushed = bufferSize;
+  }
+
+  public CompositeByteBuf getSortedBuffer() {
+    return sortedBuffer;
+  }
+
+  public void setSortedBuffer(CompositeByteBuf sortedBuffer) {
+    this.sortedBuffer = sortedBuffer;
+  }
+
+  public Map<Integer, List<ShuffleBlockInfo>> getSortedIndexes() {
+    return sortedIndexes;
+  }
+
+  public void setSortedIndexes(Map<Integer, List<ShuffleBlockInfo>> 
sortedIndexes) {
+    this.sortedIndexes = sortedIndexes;
+  }
+
+  public int expireMemoryBuffers() {

Review Comment:
   nit: better to use `releaseMemoryBuffers`



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -39,9 +55,17 @@ public synchronized List<Long> getChunkOffsets() {
   }
 
   public synchronized void addChunkOffset(long offset) {
+    nextBoundary = offset + chunkSize;
     chunkOffsets.add(offset);
   }
 
+  public void updateChunkOffset(long bytesFlushed, boolean force) {
+    if (bytesFlushed >= nextBoundary || force) {
+      addChunkOffset(bytesFlushed);
+      nextBoundary = bytesFlushed + chunkSize;

Review Comment:
   Seems this line is necessary because `addChunkOffset` already updates 
`nextBoundary`



##########
docs/configuration/worker.md:
##########
@@ -98,6 +98,7 @@ license: |
 | celeborn.worker.jvmQuake.exitCode | 502 | false | The exit code of system 
kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.kill.threshold | 60s | false | The threshold of 
system kill for the maximum GC 'deficit' which can be accumulated before 
jvmquake takes action. | 0.4.0 |  | 
 | celeborn.worker.jvmQuake.runtimeWeight | 5.0 | false | The factor by which 
to multiply running JVM time, when weighing it against GCing time. 'Deficit' is 
accumulated as `gc_time - runtime * runtime_weight`, and is compared against 
threshold to determine whether to take action. | 0.4.0 |  | 
+| celeborn.worker.memoryFileStorage.maxFileSize | 8MB | false | Max size for a 
memory storage file. It must be lesser than 2GB. | 0.5.0 |  | 

Review Comment:
   nit: less than



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -203,49 +301,103 @@ public void write(ByteBuf data) throws IOException {
     }
 
     final int numBytes = data.readableBytes();
-    MemoryManager.instance().incrementDiskBuffer(numBytes);
-
-    Optional.ofNullable(CongestionController.instance())
-        .ifPresent(
-            congestionController ->
-                
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    if (isMemoryShuffleFile.get()) {
+      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
+    } else {
+      MemoryManager.instance().incrementDiskBuffer(numBytes);
+      Optional.ofNullable(CongestionController.instance())
+          .ifPresent(
+              congestionController ->
+                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    }
 
     synchronized (flushLock) {
       if (closed) {
-        String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+        String msg = "PartitionDataWriter has already closed! Filename: ";
+        if (isMemoryShuffleFile.get()) {
+          msg += filename;
+        } else {
+          msg += diskFileInfo.getFilePath();
+        }
         logger.warn(msg);
         throw new AlreadyClosedException(msg);
       }
       if (rangeReadFilter) {
         mapIdBitMap.add(mapId);
       }
-      if (flushBuffer.readableBytes() != 0
-          && flushBuffer.readableBytes() + numBytes >= flusherBufferSize) {
-        flush(false);
+      int flushBufferReadableBytes = flushBuffer.readableBytes();
+      if (!isMemoryShuffleFile.get()) {
+        if (flushBufferReadableBytes != 0
+            && flushBufferReadableBytes + numBytes >= flusherBufferSize) {
+          flush(false);
+        }
+      } else {
+        if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
+            && storageManager.localOrHdfsStorageAvailable()) {
+          logger.debug(
+              "{} Evict, memory buffer is  {}",
+              writerContext.getPartitionLocation().getFileName(),
+              flushBufferReadableBytes);
+          evict();
+        }
       }
 
       data.retain();
       flushBuffer.addComponent(true, data);
+      if (isMemoryShuffleFile.get()) {
+        memoryFileInfo.updateBytesFlushed(numBytes);
+      }
     }
 
     numPendingWrites.decrementAndGet();
   }
 
+  public void evictInternal() throws IOException {
+    if (exception != null) {
+      return;
+    }
+    Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
+        storageManager.createFile(writerContext);
+    if (createFileResult._4() != null) {
+      this.diskFileInfo = createFileResult._3();
+      this.flusher = createFileResult._2();
+
+      isMemoryShuffleFile.set(false);
+      initFileChannelsForDiskFile();
+      flushInternal(closed, true);
+
+      memoryFileInfo.setEvicted(

Review Comment:
   I think it's unnecessary to pass a function to `setEvicted`. Just call the 
functions after calling `setEvicted`.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -47,24 +49,53 @@ public class ChunkStreamManager {
   protected final ConcurrentHashMap<String, Set<Long>> shuffleStreamIds;
 
   /** State of a single stream. */
-  protected static class StreamState {
-    final FileManagedBuffers buffers;
-    final String shuffleKey;
-    final String fileName;
-    final TimeWindow fetchTimeMetric;
-
+  public static class StreamState {
+    public final ChunkBuffers buffers;
+    public final String shuffleKey;
+    public final String fileName;
+    public final TimeWindow fetchTimeMetric;
+    public final int startIndex;
+    public final int endIndex;
     // Used to keep track of the number of chunks being transferred and not 
finished yet.
     volatile long chunksBeingTransferred = 0L;
+    public MemoryFileInfo memoryFileInfo;
+
+    StreamState(
+        String shuffleKey,
+        ChunkBuffers buffers,
+        String fileName,
+        TimeWindow fetchTimeMetric,
+        int startIndex,
+        int endIndex) {
+      this.buffers = buffers;
+      this.shuffleKey = shuffleKey;
+      this.fileName = fileName;
+      this.fetchTimeMetric = fetchTimeMetric;
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+    }
 
     StreamState(
         String shuffleKey,
-        FileManagedBuffers buffers,
+        ChunkBuffers buffers,
         String fileName,
-        TimeWindow fetchTimeMetric) {
+        TimeWindow fetchTimeMetric,
+        int startIndex,
+        int endIndex,
+        MemoryFileInfo memoryFileInfo) {
       this.buffers = buffers;
       this.shuffleKey = shuffleKey;
       this.fileName = fileName;
       this.fetchTimeMetric = fetchTimeMetric;
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+      this.memoryFileInfo = memoryFileInfo;
+    }
+
+    public void tryDecrementReaderCount() {

Review Comment:
   Unused method



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {
     synchronized (flushLock) {
       // flushBuffer == null here means writer already closed
       if (flushBuffer != null) {
         int numBytes = flushBuffer.readableBytes();
         if (numBytes != 0) {
           notifier.checkException();
-          notifier.numPendingFlushes.incrementAndGet();
           FlushTask task = null;
-          if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
-          } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+          if (evict) {
+            notifier.numPendingFlushes.incrementAndGet();
+            // flush task will release the buffer of memory shuffle file
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, false);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, false);
+            }
+            MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+            MemoryManager.instance().incrementDiskBuffer(numBytes);
+            // read flush buffer to generate correct chunk offsets
+            ByteBuf dupBuf = null;
+            if (memoryFileInfo.getSortedBuffer() != null) {
+              dupBuf = memoryFileInfo.getSortedBuffer();
+            } else {
+              dupBuf = flushBuffer.duplicate();
+            }
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);

Review Comment:
   Better to comment out the content of this 16 byte header.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -194,77 +192,183 @@ public long getSortedSize() {
   // 2. If the FileSorter task is already in the sorting queue but the sorted 
file has not been
   //    generated, it awaits until a timeout occurs (default 220 seconds).
   // 3. If the sorted file is generated, it returns the sorted FileInfo.
-  public DiskFileInfo getSortedFileInfo(
-      String shuffleKey, String fileName, DiskFileInfo fileInfo, int 
startMapIndex, int endMapIndex)
+  // This method will generate temporary file info for this shuffle read
+  public FileInfo getSortedFileInfo(
+      String shuffleKey, String fileName, FileInfo fileInfo, int 
startMapIndex, int endMapIndex)
       throws IOException {
-    String fileId = shuffleKey + "-" + fileName;
-    UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
-    Set<String> sorted =
-        sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-    Set<String> sorting =
-        sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-
-    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
-    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
-    synchronized (sorting) {
-      if (sorted.contains(fileId)) {
-        return resolve(
-            shuffleKey,
-            fileId,
-            userIdentifier,
-            sortedFilePath,
-            indexFilePath,
-            startMapIndex,
-            endMapIndex);
+    if (fileInfo instanceof MemoryFileInfo) {
+      MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
+      Map<Integer, List<ShuffleBlockInfo>> indexesMap;
+      sortMemoryShuffleFile(memoryFileInfo);
+      indexesMap = memoryFileInfo.getSortedIndexes();
+
+      ReduceFileMeta tMeta =
+          new ReduceFileMeta(
+              ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
+                  startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, 
true),
+              shuffleChunkSize);
+      CompositeByteBuf targetBuffer =
+          MemoryManager.instance()
+              .getStoragePooledByteBufAllocator()
+              .compositeBuffer(Integer.MAX_VALUE);
+      ShuffleBlockInfoUtils.sortBufferByRange(

Review Comment:
   I don't think we should extract `sortBufferByRange` here. If must, please 
rename the method to `sliceSortedBufferByMapRange`



##########
common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java:
##########
@@ -90,4 +116,23 @@ public static Map<Integer, List<ShuffleBlockInfo>> 
parseShuffleBlockInfosFromByt
     }
     return indexMap;
   }
+
+  public static void sortBufferByRange(
+      int startMapIndex,
+      int endMapIndex,
+      Map<Integer, List<ShuffleBlockInfo>> indexMap,
+      CompositeByteBuf sortedByteBuf,
+      CompositeByteBuf targetByteBuf) {
+    for (int i = startMapIndex; i < endMapIndex; i++) {
+      List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
+      if (blockInfos != null) {
+        for (ShuffleBlockInfo blockInfo : blockInfos) {
+          ByteBuf slice = sortedByteBuf.slice((int) blockInfo.offset, (int) 
blockInfo.length);

Review Comment:
   Since the required map range buffer is contiguous, we can calculate the 
overall start and end, then slice once, instead of creating so many slices.



##########
common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java:
##########
@@ -43,24 +47,46 @@ public static List<Long> 
getChunkOffsetsFromShuffleBlockInfos(
       maxMapIndex = indexMap.keySet().stream().max(Integer::compareTo).get() + 
1;
     }
 
-    for (int i = startMapIndex; i < maxMapIndex; i++) {
-      List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
-      if (blockInfos != null) {
-        for (ShuffleBlockInfo info : blockInfos) {
-          if (sortedChunkOffset.size() == 0) {
-            sortedChunkOffset.add(info.offset);
+    if (isInMemory) {
+      long currentChunkOffset = 0;
+      long lastChunkOffset = 0;
+      sortedChunkOffset.add(0l);

Review Comment:
   Please comment about the reason why we start with `0` here. `MemoryFileInfo` 
contains the exact buffer need instead of a range.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -194,77 +192,183 @@ public long getSortedSize() {
   // 2. If the FileSorter task is already in the sorting queue but the sorted 
file has not been
   //    generated, it awaits until a timeout occurs (default 220 seconds).
   // 3. If the sorted file is generated, it returns the sorted FileInfo.
-  public DiskFileInfo getSortedFileInfo(
-      String shuffleKey, String fileName, DiskFileInfo fileInfo, int 
startMapIndex, int endMapIndex)
+  // This method will generate temporary file info for this shuffle read
+  public FileInfo getSortedFileInfo(
+      String shuffleKey, String fileName, FileInfo fileInfo, int 
startMapIndex, int endMapIndex)
       throws IOException {
-    String fileId = shuffleKey + "-" + fileName;
-    UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
-    Set<String> sorted =
-        sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-    Set<String> sorting =
-        sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-
-    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
-    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
-    synchronized (sorting) {
-      if (sorted.contains(fileId)) {
-        return resolve(
-            shuffleKey,
-            fileId,
-            userIdentifier,
-            sortedFilePath,
-            indexFilePath,
-            startMapIndex,
-            endMapIndex);
+    if (fileInfo instanceof MemoryFileInfo) {
+      MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
+      Map<Integer, List<ShuffleBlockInfo>> indexesMap;
+      sortMemoryShuffleFile(memoryFileInfo);
+      indexesMap = memoryFileInfo.getSortedIndexes();
+
+      ReduceFileMeta tMeta =

Review Comment:
   `tMeta` -> `reduceFileMeta`. Please follow the naming convention.



##########
common/src/main/java/org/apache/celeborn/common/util/ShuffleBlockInfoUtils.java:
##########
@@ -90,4 +116,23 @@ public static Map<Integer, List<ShuffleBlockInfo>> 
parseShuffleBlockInfosFromByt
     }
     return indexMap;
   }
+
+  public static void sortBufferByRange(
+      int startMapIndex,
+      int endMapIndex,
+      Map<Integer, List<ShuffleBlockInfo>> indexMap,
+      CompositeByteBuf sortedByteBuf,
+      CompositeByteBuf targetByteBuf) {
+    for (int i = startMapIndex; i < endMapIndex; i++) {
+      List<ShuffleBlockInfo> blockInfos = indexMap.get(i);
+      if (blockInfos != null) {
+        for (ShuffleBlockInfo blockInfo : blockInfos) {
+          ByteBuf slice = sortedByteBuf.slice((int) blockInfo.offset, (int) 
blockInfo.length);
+          // Do not retain this buffer because this buffer will be release 
when the fileinfo is

Review Comment:
   release -> released. Please care about the grammar



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -39,14 +40,16 @@
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.reflect.DynMethods;
 import org.apache.celeborn.service.deploy.worker.storage.CreditStreamManager;
+import org.apache.celeborn.service.deploy.worker.storage.PartitionDataWriter;
+import org.apache.celeborn.service.deploy.worker.storage.StorageManager;
 
 public class MemoryManager {
   private static final Logger logger = 
LoggerFactory.getLogger(MemoryManager.class);
   private static volatile MemoryManager _INSTANCE = null;
-  @VisibleForTesting public long maxDirectorMemory = 0;
+  @VisibleForTesting public long maxDirectorMemory;

Review Comment:
   maxDirectMemory



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2993,13 +2996,14 @@ object CelebornConf extends Logging {
       .doubleConf
       .createWithDefault(0.1)
 
-  val WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE: ConfigEntry[Double] =
-    buildConf("celeborn.worker.directMemoryRatioForMemoryShuffleStorage")
+  val WORKER_DIRECT_MEMORY_RATIO_FOR_MEMORY_FILE_STORAGE: ConfigEntry[Double] =

Review Comment:
   It's better to mark this config as experimental for now (i.e. comment in the 
doc)



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -194,77 +192,183 @@ public long getSortedSize() {
   // 2. If the FileSorter task is already in the sorting queue but the sorted 
file has not been
   //    generated, it awaits until a timeout occurs (default 220 seconds).
   // 3. If the sorted file is generated, it returns the sorted FileInfo.
-  public DiskFileInfo getSortedFileInfo(
-      String shuffleKey, String fileName, DiskFileInfo fileInfo, int 
startMapIndex, int endMapIndex)
+  // This method will generate temporary file info for this shuffle read
+  public FileInfo getSortedFileInfo(
+      String shuffleKey, String fileName, FileInfo fileInfo, int 
startMapIndex, int endMapIndex)
       throws IOException {
-    String fileId = shuffleKey + "-" + fileName;
-    UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
-    Set<String> sorted =
-        sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-    Set<String> sorting =
-        sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-
-    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
-    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
-    synchronized (sorting) {
-      if (sorted.contains(fileId)) {
-        return resolve(
-            shuffleKey,
-            fileId,
-            userIdentifier,
-            sortedFilePath,
-            indexFilePath,
-            startMapIndex,
-            endMapIndex);
+    if (fileInfo instanceof MemoryFileInfo) {
+      MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
+      Map<Integer, List<ShuffleBlockInfo>> indexesMap;
+      sortMemoryShuffleFile(memoryFileInfo);
+      indexesMap = memoryFileInfo.getSortedIndexes();
+
+      ReduceFileMeta tMeta =
+          new ReduceFileMeta(
+              ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
+                  startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, 
true),
+              shuffleChunkSize);
+      CompositeByteBuf targetBuffer =
+          MemoryManager.instance()
+              .getStoragePooledByteBufAllocator()
+              .compositeBuffer(Integer.MAX_VALUE);
+      ShuffleBlockInfoUtils.sortBufferByRange(
+          startMapIndex, endMapIndex, indexesMap, 
memoryFileInfo.getSortedBuffer(), targetBuffer);
+      return new MemoryFileInfo(
+          memoryFileInfo.getUserIdentifier(),
+          memoryFileInfo.isPartitionSplitEnabled(),
+          tMeta,
+          targetBuffer);
+    } else {
+      DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
+      String fileId = shuffleKey + "-" + fileName;
+      UserIdentifier userIdentifier = diskFileInfo.getUserIdentifier();
+      Set<String> sorted =
+          sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+      Set<String> sorting =
+          sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+
+      String sortedFilePath = 
Utils.getSortedFilePath(diskFileInfo.getFilePath());
+      String indexFilePath = 
Utils.getIndexFilePath(diskFileInfo.getFilePath());
+      synchronized (sorting) {
+        if (sorted.contains(fileId)) {
+          return resolve(
+              shuffleKey,
+              fileId,
+              userIdentifier,
+              sortedFilePath,
+              indexFilePath,
+              startMapIndex,
+              endMapIndex);
+        }
+        if (!sorting.contains(fileId)) {
+          try {
+            FileSorter fileSorter = new FileSorter(diskFileInfo, fileId, 
shuffleKey);
+            sorting.add(fileId);
+            logger.debug(
+                "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
+            shuffleSortTaskDeque.put(fileSorter);
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+            throw new IOException(
+                "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
+          } catch (IOException e) {
+            logger.error("File sorter access HDFS failed.", e);
+            throw new IOException("File sorter access HDFS failed.", e);
+          }
+        }
       }
-      if (!sorting.contains(fileId)) {
-        try {
-          FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
-          sorting.add(fileId);
+
+      long sortStartTime = System.currentTimeMillis();
+      while (!sorted.contains(fileId)) {
+        if (sorting.contains(fileId)) {
+          try {
+            Thread.sleep(50);
+            if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
+              logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
+              throw new IOException(
+                  "Sort file " + diskFileInfo.getFilePath() + " timeout after 
" + sortTimeout);
+            }
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+            throw new IOException(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+          }
+        } else {
           logger.debug(
-              "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
-          shuffleSortTaskDeque.put(fileSorter);
-        } catch (InterruptedException e) {
-          logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+              "Sorting shuffle file for {} {} failed.", shuffleKey, 
diskFileInfo.getFilePath());
           throw new IOException(
-              "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
-        } catch (IOException e) {
-          logger.error("File sorter access HDFS failed.", e);
-          throw new IOException("File sorter access HDFS failed.", e);
+              "Sorting shuffle file for "
+                  + shuffleKey
+                  + " "
+                  + diskFileInfo.getFilePath()
+                  + " failed.");
         }
       }
+
+      return resolve(
+          shuffleKey,
+          fileId,
+          userIdentifier,
+          sortedFilePath,
+          indexFilePath,
+          startMapIndex,
+          endMapIndex);
     }
+  }
 
-    long sortStartTime = System.currentTimeMillis();
-    while (!sorted.contains(fileId)) {
-      if (sorting.contains(fileId)) {
-        try {
-          Thread.sleep(50);
-          if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
-            logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
-            throw new IOException(
-                "Sort file " + fileInfo.getFilePath() + " timeout after " + 
sortTimeout);
+  public static void sortMemoryShuffleFile(MemoryFileInfo memoryFileInfo) {
+    ReduceFileMeta reduceFileMeta = ((ReduceFileMeta) 
memoryFileInfo.getFileMeta());
+    synchronized (reduceFileMeta.getSorted()) {
+      if (!reduceFileMeta.getSorted().get()) {
+        CompositeByteBuf originBuffer = memoryFileInfo.getBuffer();
+        Map<Integer, List<ShuffleBlockInfo>> blocksMap = new TreeMap<>();
+        int originReaderIndex = originBuffer.readerIndex();
+        int originWriterIndex = originBuffer.writerIndex();
+        int bufLength = originBuffer.readableBytes();
+        int index = 0;
+        ByteBuffer headerBuf = ByteBuffer.allocate(16);
+        boolean fillMapBitMap = false;
+        RoaringBitmap mapIdBitMap = reduceFileMeta.getMapIds();
+        if (mapIdBitMap == null) {
+          mapIdBitMap = new RoaringBitmap();
+          fillMapBitMap = true;
+        }
+
+        while (index != bufLength) {
+          headerBuf.rewind();
+          originBuffer.readerIndex(index);
+          originBuffer.readBytes(headerBuf);
+          byte[] batchHeader = headerBuf.array();
+          int mapId = Platform.getInt(batchHeader, Platform.BYTE_ARRAY_OFFSET);
+          if (fillMapBitMap) {
+            mapIdBitMap.add(mapId);
           }
-        } catch (InterruptedException e) {
-          logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
-          throw new IOException(
-              "Sorter scheduler thread is interrupted means worker is shutting 
down.", e);
+          int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+          ShuffleBlockInfo shuffleBlockInfo = new ShuffleBlockInfo();
+          shuffleBlockInfo.offset = index;
+          shuffleBlockInfo.length = 16 + compressedSize;
+          List<ShuffleBlockInfo> singleMapIdShuffleBlockList =
+              blocksMap.computeIfAbsent(mapId, v -> new ArrayList<>());
+          singleMapIdShuffleBlockList.add(shuffleBlockInfo);
+          index += 16 + compressedSize;
         }
-      } else {
-        logger.debug("Sorting shuffle file for {} {} failed.", shuffleKey, 
fileInfo.getFilePath());
-        throw new IOException(
-            "Sorting shuffle file for " + shuffleKey + " " + 
fileInfo.getFilePath() + " failed.");
+        originBuffer.setIndex(originReaderIndex, originWriterIndex);
+
+        // sorted buffer should not consolidate
+        // because this will affect origin buffer's reference count
+        CompositeByteBuf sortedBuffer =
+            MemoryManager.instance()
+                .getStoragePooledByteBufAllocator()
+                .compositeBuffer(Integer.MAX_VALUE - 1);
+        Map<Integer, List<ShuffleBlockInfo>> sortedBlocks = new TreeMap<>();
+        int sortedBufferIndex = 0;
+        for (Map.Entry<Integer, List<ShuffleBlockInfo>> entry : 
blocksMap.entrySet()) {
+          int mapId = entry.getKey();
+          List<ShuffleBlockInfo> blockInfos = entry.getValue();
+          List<ShuffleBlockInfo> sortedMapBlocks =
+              sortedBlocks.computeIfAbsent(mapId, v -> new ArrayList<>());

Review Comment:
   `sortedBlocks` is guaranteed not to contain `mapId` here, so just use `put`. 
And pass the `initialCapacity` as `blockInfos.size()`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -194,77 +192,183 @@ public long getSortedSize() {
   // 2. If the FileSorter task is already in the sorting queue but the sorted 
file has not been
   //    generated, it awaits until a timeout occurs (default 220 seconds).
   // 3. If the sorted file is generated, it returns the sorted FileInfo.
-  public DiskFileInfo getSortedFileInfo(
-      String shuffleKey, String fileName, DiskFileInfo fileInfo, int 
startMapIndex, int endMapIndex)
+  // This method will generate temporary file info for this shuffle read
+  public FileInfo getSortedFileInfo(
+      String shuffleKey, String fileName, FileInfo fileInfo, int 
startMapIndex, int endMapIndex)
       throws IOException {
-    String fileId = shuffleKey + "-" + fileName;
-    UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
-    Set<String> sorted =
-        sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-    Set<String> sorting =
-        sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-
-    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
-    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
-    synchronized (sorting) {
-      if (sorted.contains(fileId)) {
-        return resolve(
-            shuffleKey,
-            fileId,
-            userIdentifier,
-            sortedFilePath,
-            indexFilePath,
-            startMapIndex,
-            endMapIndex);
+    if (fileInfo instanceof MemoryFileInfo) {
+      MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
+      Map<Integer, List<ShuffleBlockInfo>> indexesMap;
+      sortMemoryShuffleFile(memoryFileInfo);
+      indexesMap = memoryFileInfo.getSortedIndexes();
+
+      ReduceFileMeta tMeta =
+          new ReduceFileMeta(
+              ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
+                  startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, 
true),
+              shuffleChunkSize);
+      CompositeByteBuf targetBuffer =
+          MemoryManager.instance()
+              .getStoragePooledByteBufAllocator()
+              .compositeBuffer(Integer.MAX_VALUE);
+      ShuffleBlockInfoUtils.sortBufferByRange(
+          startMapIndex, endMapIndex, indexesMap, 
memoryFileInfo.getSortedBuffer(), targetBuffer);
+      return new MemoryFileInfo(
+          memoryFileInfo.getUserIdentifier(),
+          memoryFileInfo.isPartitionSplitEnabled(),
+          tMeta,
+          targetBuffer);
+    } else {
+      DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
+      String fileId = shuffleKey + "-" + fileName;
+      UserIdentifier userIdentifier = diskFileInfo.getUserIdentifier();
+      Set<String> sorted =
+          sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+      Set<String> sorting =
+          sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+
+      String sortedFilePath = 
Utils.getSortedFilePath(diskFileInfo.getFilePath());
+      String indexFilePath = 
Utils.getIndexFilePath(diskFileInfo.getFilePath());
+      synchronized (sorting) {
+        if (sorted.contains(fileId)) {
+          return resolve(
+              shuffleKey,
+              fileId,
+              userIdentifier,
+              sortedFilePath,
+              indexFilePath,
+              startMapIndex,
+              endMapIndex);
+        }
+        if (!sorting.contains(fileId)) {
+          try {
+            FileSorter fileSorter = new FileSorter(diskFileInfo, fileId, 
shuffleKey);
+            sorting.add(fileId);
+            logger.debug(
+                "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
+            shuffleSortTaskDeque.put(fileSorter);
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+            throw new IOException(
+                "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
+          } catch (IOException e) {
+            logger.error("File sorter access HDFS failed.", e);
+            throw new IOException("File sorter access HDFS failed.", e);
+          }
+        }
       }
-      if (!sorting.contains(fileId)) {
-        try {
-          FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
-          sorting.add(fileId);
+
+      long sortStartTime = System.currentTimeMillis();
+      while (!sorted.contains(fileId)) {
+        if (sorting.contains(fileId)) {
+          try {
+            Thread.sleep(50);
+            if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
+              logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
+              throw new IOException(
+                  "Sort file " + diskFileInfo.getFilePath() + " timeout after 
" + sortTimeout);
+            }
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+            throw new IOException(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+          }
+        } else {
           logger.debug(
-              "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
-          shuffleSortTaskDeque.put(fileSorter);
-        } catch (InterruptedException e) {
-          logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+              "Sorting shuffle file for {} {} failed.", shuffleKey, 
diskFileInfo.getFilePath());
           throw new IOException(
-              "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
-        } catch (IOException e) {
-          logger.error("File sorter access HDFS failed.", e);
-          throw new IOException("File sorter access HDFS failed.", e);
+              "Sorting shuffle file for "
+                  + shuffleKey
+                  + " "
+                  + diskFileInfo.getFilePath()
+                  + " failed.");
         }
       }
+
+      return resolve(
+          shuffleKey,
+          fileId,
+          userIdentifier,
+          sortedFilePath,
+          indexFilePath,
+          startMapIndex,
+          endMapIndex);
     }
+  }
 
-    long sortStartTime = System.currentTimeMillis();
-    while (!sorted.contains(fileId)) {
-      if (sorting.contains(fileId)) {
-        try {
-          Thread.sleep(50);
-          if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
-            logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
-            throw new IOException(
-                "Sort file " + fileInfo.getFilePath() + " timeout after " + 
sortTimeout);
+  public static void sortMemoryShuffleFile(MemoryFileInfo memoryFileInfo) {
+    ReduceFileMeta reduceFileMeta = ((ReduceFileMeta) 
memoryFileInfo.getFileMeta());
+    synchronized (reduceFileMeta.getSorted()) {
+      if (!reduceFileMeta.getSorted().get()) {
+        CompositeByteBuf originBuffer = memoryFileInfo.getBuffer();
+        Map<Integer, List<ShuffleBlockInfo>> blocksMap = new TreeMap<>();
+        int originReaderIndex = originBuffer.readerIndex();
+        int originWriterIndex = originBuffer.writerIndex();
+        int bufLength = originBuffer.readableBytes();
+        int index = 0;
+        ByteBuffer headerBuf = ByteBuffer.allocate(16);
+        boolean fillMapBitMap = false;
+        RoaringBitmap mapIdBitMap = reduceFileMeta.getMapIds();
+        if (mapIdBitMap == null) {
+          mapIdBitMap = new RoaringBitmap();
+          fillMapBitMap = true;

Review Comment:
   If a map-range request is retried, the first reader fails, sends 
`BUFFER_STREAM_END` and removes the map ids, the retried request will not set 
the map ids because `mapIdBitMap` is not empty.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -208,19 +224,91 @@ private MemoryManager(CelebornConf conf) {
           TimeUnit.MILLISECONDS);
     }
 
+    this.storageManager = storageManager;
+    if (memoryFileStorageThreshold > 0
+        && storageManager != null
+        && storageManager.localOrHdfsStorageAvailable()) {
+      ScheduledExecutorService memoryFileStorageService =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
+      memoryFileStorageService.scheduleWithFixedDelay(
+          () -> {
+            try {
+              if ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                  || currentServingState() != ServingState.NONE_PAUSED) {
+                List<PartitionDataWriter> committedWriters = new ArrayList<>();
+                List<PartitionDataWriter> unCommittedWriters = new 
ArrayList<>();
+                synchronized (storageManager.memoryFileInfos()) {
+                  for (PartitionDataWriter writer : 
storageManager.memoryWriters().values()) {
+                    if (writer.isClosed() && 
writer.getMemoryFileInfo().isFullyRead()) {
+                      committedWriters.add(writer);
+                    } else {
+                      unCommittedWriters.add(writer);
+                    }
+                  }
+                  if (committedWriters.isEmpty() && 
unCommittedWriters.isEmpty()) {
+                    return;
+                  }
+                  logger.info(
+                      "Start evicting memory fileinfo committed {} uncommitted 
{}",
+                      committedWriters.size(),
+                      unCommittedWriters.size());
+                  committedWriters.sort(
+                      (o1, o2) ->
+                          o1.getMemoryFileInfo().getFileLength()
+                                  > o2.getMemoryFileInfo().getFileLength()
+                              ? 1
+                              : 0);
+                  unCommittedWriters.sort(
+                      (o1, o2) ->
+                          o1.getMemoryFileInfo().getFileLength()
+                                  > o2.getMemoryFileInfo().getFileLength()
+                              ? 1
+                              : 0);
+                  while ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                      || currentServingState() != ServingState.NONE_PAUSED) {
+                    try {
+                      if (!committedWriters.isEmpty()) {
+                        PartitionDataWriter writer = 
committedWriters.remove(0);

Review Comment:
   `remove` is expensive for ArrayList. Better to use variables to track indices



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -782,15 +857,75 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     committedFileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
   }
 
-  def getActiveShuffleSize(): Long = {
-    
diskFileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
+  def getActiveShuffleSize: Long = {
+    
diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength).sum).sum
   }
 
-  def getActiveShuffleFileCount(): Long = {
+  def getActiveShuffleFileCount: Long = {
     diskFileInfos.asScala.values.map(_.size()).sum
   }
 
   def createFile(
+      partitionDataWriterContext: PartitionDataWriterContext)

Review Comment:
   Just add a parameter like `canUseMemory` instead of switching 
partitionDataWriterContext.isCanUseMemory in so many places.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -782,15 +857,75 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     committedFileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
   }
 
-  def getActiveShuffleSize(): Long = {
-    
diskFileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
+  def getActiveShuffleSize: Long = {
+    
diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength).sum).sum
   }
 
-  def getActiveShuffleFileCount(): Long = {
+  def getActiveShuffleFileCount: Long = {
     diskFileInfos.asScala.values.map(_.size()).sum
   }
 
   def createFile(
+      partitionDataWriterContext: PartitionDataWriterContext)
+      : (MemoryFileInfo, Flusher, DiskFileInfo, File) = {
+    val location = partitionDataWriterContext.getPartitionLocation
+    if (partitionDataWriterContext.isCanUseMemory && 
location.getStorageInfo.memoryAvailable() && 
MemoryManager.instance().memoryFileStorageAvailable()) {

Review Comment:
   Please avoid long lines.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {

Review Comment:
   I think it's unnecessary to define this new method, and it's unnecessary to 
write `synchronized (flushLock)` because it's already inside `flushLock`. Just 
rename the original `flush` to `flushLocked`, and comment that this method 
should be always invoked inside `flushLock`.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {
     synchronized (flushLock) {
       // flushBuffer == null here means writer already closed
       if (flushBuffer != null) {
         int numBytes = flushBuffer.readableBytes();
         if (numBytes != 0) {
           notifier.checkException();
-          notifier.numPendingFlushes.incrementAndGet();
           FlushTask task = null;
-          if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
-          } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+          if (evict) {
+            notifier.numPendingFlushes.incrementAndGet();
+            // flush task will release the buffer of memory shuffle file
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, false);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, false);
+            }
+            MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+            MemoryManager.instance().incrementDiskBuffer(numBytes);
+            // read flush buffer to generate correct chunk offsets
+            ByteBuf dupBuf = null;
+            if (memoryFileInfo.getSortedBuffer() != null) {
+              dupBuf = memoryFileInfo.getSortedBuffer();
+            } else {
+              dupBuf = flushBuffer.duplicate();
+            }
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);
+            while (dupBuf.isReadable()) {
+              headerBuf.rewind();
+              dupBuf.readBytes(headerBuf);
+              byte[] batchHeader = headerBuf.array();
+              int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+              dupBuf.skipBytes(compressedSize);
+              diskFileInfo.updateBytesFlushed(compressedSize + 16);
+            }
+          } else {
+            if (!isMemoryShuffleFile.get()) {

Review Comment:
   I think `isMemoryShuffleFile` should never be `true` here. So, instead of 
use `if`, just use `Preconditions.check`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterContext.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
+import org.apache.celeborn.common.util.Utils;
+
+public class PartitionDataWriterContext {
+  private final long splitThreshold;
+  private final PartitionSplitMode partitionSplitMode;
+  private final boolean rangeReadFilter;
+  private final PartitionLocation partitionLocation;
+  private final String appId;
+  private final int shuffleId;
+  private final UserIdentifier userIdentifier;
+  private final boolean partitionSplitEnabled;
+  private final String shuffleKey;
+  private final PartitionType partitionType;
+  private boolean canUseMemory;
+
+  public PartitionDataWriterContext(
+      long splitThreshold,
+      PartitionSplitMode partitionSplitMode,
+      boolean rangeReadFilter,
+      PartitionLocation partitionLocation,
+      String appId,
+      int shuffleId,
+      UserIdentifier userIdentifier,
+      PartitionType partitionType,
+      boolean partitionSplitEnabled) {
+    this.splitThreshold = splitThreshold;
+    this.partitionSplitMode = partitionSplitMode;
+    this.rangeReadFilter = rangeReadFilter;
+    this.partitionLocation = partitionLocation;
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+    this.userIdentifier = userIdentifier;
+    this.partitionSplitEnabled = partitionSplitEnabled;
+    this.partitionType = partitionType;
+    this.shuffleKey = Utils.makeShuffleKey(appId, shuffleId);
+  }
+
+  public long getSplitThreshold() {
+    return splitThreshold;
+  }
+
+  public PartitionSplitMode getPartitionSplitMode() {
+    return partitionSplitMode;
+  }
+
+  public boolean isRangeReadFilter() {
+    return rangeReadFilter;
+  }
+
+  public PartitionLocation getPartitionLocation() {
+    return partitionLocation;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public int getShuffleId() {
+    return shuffleId;
+  }
+
+  public UserIdentifier getUserIdentifier() {
+    return userIdentifier;
+  }
+
+  public boolean isPartitionSplitEnabled() {
+    return partitionSplitEnabled;
+  }
+
+  public String getShuffleKey() {
+    return shuffleKey;
+  }
+
+  public PartitionType getPartitionType() {
+    return partitionType;
+  }
+
+  public boolean isCanUseMemory() {

Review Comment:
   just `canUseMemory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to