RexXiong commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1559001005


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -3021,11 +3025,20 @@ object CelebornConf extends Logging {
   val WORKER_DIRECT_MEMORY_RATIO_RESUME: ConfigEntry[Double] =
     buildConf("celeborn.worker.directMemoryRatioToResume")
       .categories("worker")
-      .doc("If direct memory usage is less than this limit, worker will 
resume.")
+      .doc("If direct memory usage minus memory storage files usage is less 
than this limit, worker will resume.")

Review Comment:
   Should only consider direct memory usage



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -3021,11 +3025,20 @@ object CelebornConf extends Logging {
   val WORKER_DIRECT_MEMORY_RATIO_RESUME: ConfigEntry[Double] =
     buildConf("celeborn.worker.directMemoryRatioToResume")
       .categories("worker")
-      .doc("If direct memory usage is less than this limit, worker will 
resume.")
+      .doc("If direct memory usage minus memory storage files usage is less 
than this limit, worker will resume.")
       .version("0.2.0")
       .doubleConf
       .createWithDefault(0.7)
 
+  val WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.worker.memoryFileStorage.maxFileSize")
+      .categories("worker")
+      .doc("Max size for a memory storage file. It must be lesser than 2GB.")
+      .version("0.4.1")

Review Comment:
   0.4.1 -> 0.5.0



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -393,13 +586,20 @@ protected void addTask(FlushTask task) throws IOException 
{
 
   protected void returnBuffer() {
     synchronized (flushLock) {
-      if (flushBuffer != null) {
-        flusher.returnBuffer(flushBuffer);
+      if (flushBuffer != null && flusher != null) {
+        flusher.returnBuffer(flushBuffer, true);
         flushBuffer = null;
       }
     }
   }
 
+  public void cleanPartitionWriter() {

Review Comment:
   IMO cleanPartitionWriter can move to destroy method.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -208,19 +223,95 @@ private MemoryManager(CelebornConf conf) {
           TimeUnit.MILLISECONDS);
     }
 
+    this.storageManager = storageManager;
+    if (memoryFileStorageThreshold > 0
+        && storageManager != null
+        && storageManager.localOrHdfsStorageAvailable()) {
+      ScheduledExecutorService memoryFileStorageService =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
+      memoryFileStorageService.scheduleWithFixedDelay(
+          () -> {
+            try {
+              if ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                  || currentServingState() != ServingState.NONE_PAUSED) {
+                List<PartitionDataWriter> committedWriters = new ArrayList<>();
+                List<PartitionDataWriter> unCommittedWriters = new 
ArrayList<>();
+                synchronized (storageManager.memoryFileInfos()) {
+                  for (PartitionDataWriter writer : 
storageManager.memoryWriters().values()) {
+                    if (writer.isClosed() && 
writer.getMemoryFileInfo().isFullyRead()) {
+                      committedWriters.add(writer);
+                    } else {
+                      unCommittedWriters.add(writer);
+                    }
+                  }
+                  if (committedWriters.isEmpty() && 
unCommittedWriters.isEmpty()) {
+                    return;
+                  }
+                  logger.info(
+                      "Start evicting memory fileinfo committed {} uncommitted 
{}",
+                      committedWriters.size(),
+                      unCommittedWriters.size());
+                  committedWriters.sort(
+                      (o1, o2) ->
+                          o1.getMemoryFileInfo().getFileLength()
+                                  > o2.getMemoryFileInfo().getFileLength()
+                              ? 1
+                              : 0);
+                  unCommittedWriters.sort(
+                      (o1, o2) ->
+                          o1.getMemoryFileInfo().getFileLength()
+                                  > o2.getMemoryFileInfo().getFileLength()
+                              ? 1
+                              : 0);
+                  while ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                      || currentServingState() != ServingState.NONE_PAUSED) {
+                    try {
+                      if (!committedWriters.isEmpty()) {
+                        PartitionDataWriter writer = 
committedWriters.remove(0);
+                        synchronized (writer.getMemoryFileInfo()) {
+                          if (!writer.getMemoryFileInfo().hasReader()) {
+                            writer.evict();

Review Comment:
   check hasReader or not in writer.evict?



##########
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java:
##########
@@ -39,11 +59,50 @@ public void setBuffer(CompositeByteBuf buffer) {
   }
 
   public void setBufferSize(int bufferSize) {
-    this.length = bufferSize;
+    this.bytesFlushed = bufferSize;
+  }
+
+  public CompositeByteBuf getSortedBuffer() {
+    return sortedBuffer;
+  }
+
+  public void setSortedBuffer(CompositeByteBuf sortedBuffer) {
+    this.sortedBuffer = sortedBuffer;
+  }
+
+  public Map<Integer, List<ShuffleBlockInfo>> getSortedIndexes() {
+    return sortedIndexes;
+  }
+
+  public void setSortedIndexes(Map<Integer, List<ShuffleBlockInfo>> 
sortedIndexes) {
+    this.sortedIndexes = sortedIndexes;
+  }
+
+  public int expireMemoryBuffers() {
+    int bufferSize = 0;
+    if (buffer != null) {
+      bufferSize = buffer.writerIndex();
+      buffer.release();
+    }
+    logger.info("Memory File Info {} expire, removed {}", this, bufferSize);
+    return bufferSize;
+  }
+
+  public void incrementReaderCount() {
+    readerCount.incrementAndGet();
+  }
+
+  public void decrementReaderCount() {
+    if (readerCount.get() > 0) {
+      readerCount.decrementAndGet();
+    }
+  }
+
+  public boolean hasReader() {
+    return readerCount.get() > 0;
   }
 
-  @Override
-  public long getFileLength() {
-    return length;
+  public AtomicBoolean getEvicted() {

Review Comment:
   use setEvicted and isEvicted instead expose evicted.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -782,15 +856,75 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     committedFileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
   }
 
-  def getActiveShuffleSize(): Long = {
-    
diskFileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
+  def getActiveShuffleSize: Long = {
+    
diskFileInfos.values().asScala.map(_.values().asScala.map(_.getFileLength).sum).sum
   }
 
-  def getActiveShuffleFileCount(): Long = {
+  def getActiveShuffleFileCount: Long = {
     diskFileInfos.asScala.values.map(_.size()).sum
   }
 
   def createFile(
+      partitionDataWriterContext: PartitionDataWriterContext,
+      inMem: Boolean): (MemoryFileInfo, Flusher, DiskFileInfo, File) = {

Review Comment:
   Why not put inMem parameter to partitionDataWriterContext



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -443,9 +504,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   }
 
   def cleanFile(shuffleKey: String, fileName: String): Unit = {
-    val fileInfo = getDiskFileInfo(shuffleKey, fileName)
+    val fileInfo = getFileInfo(shuffleKey, fileName)
     if (fileInfo != null) {
-      cleanFileInternal(shuffleKey, fileInfo)
+      fileInfo match {
+        case info: DiskFileInfo =>
+          cleanFileInternal(shuffleKey, info)

Review Comment:
   why not call destroy like other type writers?



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -85,11 +88,17 @@ public class MemoryManager {
           "worker-memory-manager-read-buffer-target-updater");
   private CreditStreamManager creditStreamManager = null;
 
-  private long memoryShuffleStorageThreshold = 0;
+  private long memoryFileStorageThreshold;
+  private final AtomicLong memoryFileStorageCounter = new AtomicLong();
+  private final StorageManager storageManager;
 
   public static MemoryManager initialize(CelebornConf conf) {
+    return initialize(conf, null);

Review Comment:
   @VisibleForTesting or remove this initialize method



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -313,7 +313,22 @@ private[deploy] class Controller(
                   } else {
                     committedStorageInfos.put(uniqueId, 
fileWriter.getStorageInfo)
                     if (fileWriter.getMapIdBitMap != null) {
-                      committedMapIdBitMap.put(uniqueId, 
fileWriter.getMapIdBitMap)
+                      val mapIdBitMap = fileWriter.getMapIdBitMap
+                      if (fileWriter.getDiskFileInfo != null) {
+                        fileWriter.getDiskFileInfo.getFileMeta match {
+                          case meta: ReduceFileMeta =>
+                            meta.setMapIds(mapIdBitMap)
+                          case _ =>
+                        }
+                      }
+                      committedMapIdBitMap.put(uniqueId, mapIdBitMap)
+                      // resue mapid bitmap if this is memory storage shuffle 
file
+                      val memoryFileInfo = fileWriter.getMemoryFileInfo
+                      if (fileWriter.getMemoryFileInfo != null && 
memoryFileInfo.getFileMeta.isInstanceOf[

Review Comment:
   fileWriter.getMemoryFileInfo -> memoryFileInfo!=null



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -194,77 +192,185 @@ public long getSortedSize() {
   // 2. If the FileSorter task is already in the sorting queue but the sorted 
file has not been
   //    generated, it awaits until a timeout occurs (default 220 seconds).
   // 3. If the sorted file is generated, it returns the sorted FileInfo.
-  public DiskFileInfo getSortedFileInfo(
-      String shuffleKey, String fileName, DiskFileInfo fileInfo, int 
startMapIndex, int endMapIndex)
+  // This method will generate temporary file info for this shuffle read
+  public FileInfo getSortedFileInfo(
+      String shuffleKey, String fileName, FileInfo fileInfo, int 
startMapIndex, int endMapIndex)
       throws IOException {
-    String fileId = shuffleKey + "-" + fileName;
-    UserIdentifier userIdentifier = fileInfo.getUserIdentifier();
-    Set<String> sorted =
-        sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-    Set<String> sorting =
-        sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
-
-    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
-    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
-    synchronized (sorting) {
-      if (sorted.contains(fileId)) {
-        return resolve(
-            shuffleKey,
-            fileId,
-            userIdentifier,
-            sortedFilePath,
-            indexFilePath,
-            startMapIndex,
-            endMapIndex);
+    if (fileInfo instanceof MemoryFileInfo) {
+      ReduceFileMeta meta = ((ReduceFileMeta) fileInfo.getFileMeta());
+      MemoryFileInfo memoryFileInfo = ((MemoryFileInfo) fileInfo);
+      Map<Integer, List<ShuffleBlockInfo>> indexesMap;
+      synchronized (meta.getSorted()) {
+        if (!meta.getSorted().get()) {
+          sortMemoryShuffleFile(memoryFileInfo);
+          meta.setSorted();
+        }
       }
-      if (!sorting.contains(fileId)) {
-        try {
-          FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
-          sorting.add(fileId);
-          logger.debug(
-              "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
-          shuffleSortTaskDeque.put(fileSorter);
-        } catch (InterruptedException e) {
-          logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
-          throw new IOException(
-              "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
-        } catch (IOException e) {
-          logger.error("File sorter access HDFS failed.", e);
-          throw new IOException("File sorter access HDFS failed.", e);
+      indexesMap = memoryFileInfo.getSortedIndexes();
+
+      ReduceFileMeta tMeta =
+          new ReduceFileMeta(
+              ShuffleBlockInfoUtils.getChunkOffsetsFromShuffleBlockInfos(
+                  startMapIndex, endMapIndex, shuffleChunkSize, indexesMap, 
true),
+              shuffleChunkSize);
+      CompositeByteBuf targetBuffer =
+          MemoryManager.instance()
+              .getStoragePooledByteBufAllocator()
+              .compositeBuffer(Integer.MAX_VALUE);
+      ShuffleBlockInfoUtils.sortBufferByRange(
+          startMapIndex, endMapIndex, indexesMap, 
memoryFileInfo.getSortedBuffer(), targetBuffer);
+      return new MemoryFileInfo(
+          memoryFileInfo.getUserIdentifier(),
+          memoryFileInfo.isPartitionSplitEnabled(),
+          tMeta,
+          targetBuffer);
+    } else {
+      DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
+      String fileId = shuffleKey + "-" + fileName;
+      UserIdentifier userIdentifier = diskFileInfo.getUserIdentifier();
+      Set<String> sorted =
+          sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+      Set<String> sorting =
+          sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+
+      String sortedFilePath = 
Utils.getSortedFilePath(diskFileInfo.getFilePath());
+      String indexFilePath = 
Utils.getIndexFilePath(diskFileInfo.getFilePath());
+      synchronized (sorting) {
+        if (sorted.contains(fileId)) {
+          return resolve(
+              shuffleKey,
+              fileId,
+              userIdentifier,
+              sortedFilePath,
+              indexFilePath,
+              startMapIndex,
+              endMapIndex);
+        }
+        if (!sorting.contains(fileId)) {
+          try {
+            FileSorter fileSorter = new FileSorter(diskFileInfo, fileId, 
shuffleKey);
+            sorting.add(fileId);
+            logger.debug(
+                "Adding sorter to sort queue shuffle key {}, file name {}", 
shuffleKey, fileName);
+            shuffleSortTaskDeque.put(fileSorter);
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+            throw new IOException(
+                "Sort scheduler thread is interrupted means worker is shutting 
down.", e);
+          } catch (IOException e) {
+            logger.error("File sorter access HDFS failed.", e);
+            throw new IOException("File sorter access HDFS failed.", e);
+          }
         }
       }
-    }
 
-    long sortStartTime = System.currentTimeMillis();
-    while (!sorted.contains(fileId)) {
-      if (sorting.contains(fileId)) {
-        try {
-          Thread.sleep(50);
-          if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
-            logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
+      long sortStartTime = System.currentTimeMillis();
+      while (!sorted.contains(fileId)) {
+        if (sorting.contains(fileId)) {
+          try {
+            Thread.sleep(50);
+            if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
+              logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
+              throw new IOException(
+                  "Sort file " + diskFileInfo.getFilePath() + " timeout after 
" + sortTimeout);
+            }
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
             throw new IOException(
-                "Sort file " + fileInfo.getFilePath() + " timeout after " + 
sortTimeout);
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
           }
-        } catch (InterruptedException e) {
-          logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+        } else {
+          logger.debug(
+              "Sorting shuffle file for {} {} failed.", shuffleKey, 
diskFileInfo.getFilePath());
           throw new IOException(
-              "Sorter scheduler thread is interrupted means worker is shutting 
down.", e);
+              "Sorting shuffle file for "
+                  + shuffleKey
+                  + " "
+                  + diskFileInfo.getFilePath()
+                  + " failed.");
         }
-      } else {
-        logger.debug("Sorting shuffle file for {} {} failed.", shuffleKey, 
fileInfo.getFilePath());
-        throw new IOException(
-            "Sorting shuffle file for " + shuffleKey + " " + 
fileInfo.getFilePath() + " failed.");
       }
+
+      return resolve(
+          shuffleKey,
+          fileId,
+          userIdentifier,
+          sortedFilePath,
+          indexFilePath,
+          startMapIndex,
+          endMapIndex);
     }
+  }
 
-    return resolve(
-        shuffleKey,
-        fileId,
-        userIdentifier,
-        sortedFilePath,
-        indexFilePath,
-        startMapIndex,
-        endMapIndex);
+  public static void sortMemoryShuffleFile(MemoryFileInfo memoryFileInfo) {
+    ReduceFileMeta reduceFileMeta = ((ReduceFileMeta) 
memoryFileInfo.getFileMeta());

Review Comment:
   Check already sorted or not before sort



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -203,49 +298,103 @@ public void write(ByteBuf data) throws IOException {
     }
 
     final int numBytes = data.readableBytes();
-    MemoryManager.instance().incrementDiskBuffer(numBytes);
-
-    Optional.ofNullable(CongestionController.instance())
-        .ifPresent(
-            congestionController ->
-                
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    if (isMemoryShuffleFile.get()) {
+      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
+    } else {
+      MemoryManager.instance().incrementDiskBuffer(numBytes);
+      Optional.ofNullable(CongestionController.instance())
+          .ifPresent(
+              congestionController ->
+                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    }
 
     synchronized (flushLock) {
       if (closed) {
-        String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+        String msg = "PartitionDataWriter has already closed! Filename: ";
+        if (isMemoryShuffleFile.get()) {
+          msg += filename;
+        } else {
+          msg += diskFileInfo.getFilePath();
+        }
         logger.warn(msg);
         throw new AlreadyClosedException(msg);
       }
       if (rangeReadFilter) {
         mapIdBitMap.add(mapId);
       }
-      if (flushBuffer.readableBytes() != 0
-          && flushBuffer.readableBytes() + numBytes >= flusherBufferSize) {
-        flush(false);
+      int flushBufferReadableBytes = flushBuffer.readableBytes();
+      if (!isMemoryShuffleFile.get()) {
+        if (flushBufferReadableBytes != 0
+            && flushBufferReadableBytes + numBytes >= flusherBufferSize) {
+          flush(false);
+        }
+      } else {
+        if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
+            && storageManager.localOrHdfsStorageAvailable()) {
+          logger.debug(
+              "{} Evict, memory buffer is  {}",
+              writerContext.getPartitionLocation().getFileName(),
+              flushBufferReadableBytes);
+          evict();
+        }
       }
 
       data.retain();
       flushBuffer.addComponent(true, data);
+      if (isMemoryShuffleFile.get()) {
+        memoryFileInfo.updateBytesFlushed(numBytes);
+      }
     }
 
     numPendingWrites.decrementAndGet();
   }
 
+  public void evict() throws IOException {
+    synchronized (flushLock) {
+      if (exception != null) {
+        return;
+      }
+      Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
+          storageManager.createFile(writerContext, false);
+      if (createFileResult._4() != null) {
+        this.diskFileInfo = createFileResult._3();
+        this.flusher = createFileResult._2();
+
+        isMemoryShuffleFile.set(false);
+        initFileChannelsForDiskFile();
+        flushInternal(closed, true);
+
+        memoryFileInfo.getEvicted().set(true);

Review Comment:
   memoryFileInfo.setEvict()



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -440,10 +462,19 @@ class FetchHandler(
       streamType: StreamType): Unit = {
     streamType match {
       case StreamType.ChunkStream =>
-        val (shuffleKey, fileName) = 
chunkStreamManager.getShuffleKeyAndFileName(streamId)
+        val streamState = chunkStreamManager.getStreamState(streamId)
+        val (shuffleKey, fileName, startIndex, endIndex) = (
+          streamState.shuffleKey,
+          streamState.fileName,
+          streamState.startIndex,
+          streamState.endIndex)
         workerSource.recordAppActiveConnection(client, shuffleKey)
-        getRawDiskFileInfo(shuffleKey, fileName).closeStream(
-          streamId)
+        val fileinfo = getRawFileInfo(shuffleKey, fileName)
+        fileinfo.closeStream(streamId)
+        if (fileinfo.isInstanceOf[MemoryFileInfo]) {
+          
fileinfo.getFileMeta.asInstanceOf[ReduceFileMeta].removeMapIds(startIndex, 
endIndex)
+          fileinfo.asInstanceOf[MemoryFileInfo].decrementReaderCount()

Review Comment:
   decrementReaderCount when MemoryFileInfo.closeStream



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -203,49 +298,103 @@ public void write(ByteBuf data) throws IOException {
     }
 
     final int numBytes = data.readableBytes();
-    MemoryManager.instance().incrementDiskBuffer(numBytes);
-
-    Optional.ofNullable(CongestionController.instance())
-        .ifPresent(
-            congestionController ->
-                
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    if (isMemoryShuffleFile.get()) {
+      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
+    } else {
+      MemoryManager.instance().incrementDiskBuffer(numBytes);
+      Optional.ofNullable(CongestionController.instance())
+          .ifPresent(
+              congestionController ->
+                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    }
 
     synchronized (flushLock) {
       if (closed) {
-        String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+        String msg = "PartitionDataWriter has already closed! Filename: ";
+        if (isMemoryShuffleFile.get()) {
+          msg += filename;
+        } else {
+          msg += diskFileInfo.getFilePath();
+        }
         logger.warn(msg);
         throw new AlreadyClosedException(msg);
       }
       if (rangeReadFilter) {
         mapIdBitMap.add(mapId);
       }
-      if (flushBuffer.readableBytes() != 0
-          && flushBuffer.readableBytes() + numBytes >= flusherBufferSize) {
-        flush(false);
+      int flushBufferReadableBytes = flushBuffer.readableBytes();
+      if (!isMemoryShuffleFile.get()) {
+        if (flushBufferReadableBytes != 0
+            && flushBufferReadableBytes + numBytes >= flusherBufferSize) {
+          flush(false);
+        }
+      } else {
+        if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
+            && storageManager.localOrHdfsStorageAvailable()) {
+          logger.debug(
+              "{} Evict, memory buffer is  {}",
+              writerContext.getPartitionLocation().getFileName(),
+              flushBufferReadableBytes);
+          evict();
+        }
       }
 
       data.retain();
       flushBuffer.addComponent(true, data);
+      if (isMemoryShuffleFile.get()) {
+        memoryFileInfo.updateBytesFlushed(numBytes);
+      }
     }
 
     numPendingWrites.decrementAndGet();
   }
 
+  public void evict() throws IOException {
+    synchronized (flushLock) {
+      if (exception != null) {
+        return;
+      }
+      Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
+          storageManager.createFile(writerContext, false);
+      if (createFileResult._4() != null) {
+        this.diskFileInfo = createFileResult._3();
+        this.flusher = createFileResult._2();
+
+        isMemoryShuffleFile.set(false);
+        initFileChannelsForDiskFile();
+        flushInternal(closed, true);
+
+        memoryFileInfo.getEvicted().set(true);

Review Comment:
   better use a callback method to unregister MemoryPartitionWriter and 
increment evict count when memoryFile evicted.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -297,15 +459,30 @@ protected synchronized long close(
       finalClose.run();
 
       // unregister from DeviceMonitor
-      if (!diskFileInfo.isHdfs()) {
+      if (diskFileInfo != null && !diskFileInfo.isHdfs()) {
         logger.debug("file info {} unregister from device monitor", 
diskFileInfo);
         deviceMonitor.unregisterFileWriter(this);
       }
     }
     if (workerGracefulShutdown) {
+      if (diskFileInfo != null) {
+        storageManager.notifyFileInfoCommitted(shuffleKey, 
getFile().getName(), diskFileInfo);
+      }
+    }
+    if (diskFileInfo != null) {
+      return diskFileInfo.getFileLength();
+    } else {
+      return memoryFileInfo.getFileLength();
+    }
+  }
+
+  public synchronized void saveMemoryShuffleFileOnGracefulShutdown() throws 
IOException {
+    if (memoryFileInfo != null) {
+      PartitionFilesSorter.sortMemoryShuffleFile(memoryFileInfo);
+      evict();
       storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), 
diskFileInfo);

Review Comment:
   notifyFileInfoCommitted after waitOnNoPending flushes



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -297,15 +459,30 @@ protected synchronized long close(
       finalClose.run();
 
       // unregister from DeviceMonitor
-      if (!diskFileInfo.isHdfs()) {
+      if (diskFileInfo != null && !diskFileInfo.isHdfs()) {
         logger.debug("file info {} unregister from device monitor", 
diskFileInfo);
         deviceMonitor.unregisterFileWriter(this);
       }
     }
     if (workerGracefulShutdown) {
+      if (diskFileInfo != null) {
+        storageManager.notifyFileInfoCommitted(shuffleKey, 
getFile().getName(), diskFileInfo);
+      }
+    }
+    if (diskFileInfo != null) {
+      return diskFileInfo.getFileLength();
+    } else {
+      return memoryFileInfo.getFileLength();
+    }
+  }
+
+  public synchronized void saveMemoryShuffleFileOnGracefulShutdown() throws 
IOException {
+    if (memoryFileInfo != null) {
+      PartitionFilesSorter.sortMemoryShuffleFile(memoryFileInfo);
+      evict();
       storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), 
diskFileInfo);
+      waitOnNoPending(notifier.numPendingFlushes);

Review Comment:
   move notifyFileInfoCommitted and waitOnNoPending to evict



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -184,7 +235,9 @@ public void cleanupExpiredShuffleKey(Set<String> 
expiredShuffleKeys) {
 
       // normally expiredStreamIds set will be empty as streamId will be 
removed when be fully read
       if (expiredStreamIds != null && !expiredStreamIds.isEmpty()) {
-        expiredStreamIds.forEach(streams::remove);
+        for (Long streamId : expiredStreamIds) {
+          streams.remove(streamId).tryDecrementReaderCount();

Review Comment:
   Seems we don't have to do this as the shuffle is already expired



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -313,7 +313,22 @@ private[deploy] class Controller(
                   } else {
                     committedStorageInfos.put(uniqueId, 
fileWriter.getStorageInfo)
                     if (fileWriter.getMapIdBitMap != null) {
-                      committedMapIdBitMap.put(uniqueId, 
fileWriter.getMapIdBitMap)
+                      val mapIdBitMap = fileWriter.getMapIdBitMap
+                      if (fileWriter.getDiskFileInfo != null) {
+                        fileWriter.getDiskFileInfo.getFileMeta match {

Review Comment:
   move this to fileWriter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to