waitinfuture commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1590812398


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -354,65 +381,99 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) {
       throw new IOException("No available working dirs!")
     }
-    val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
-    val (flusher, diskFileInfo, workingDir) = createFile(
+    val partitionDataWriterContext = new PartitionDataWriterContext(
+      splitThreshold,
+      splitMode,
+      rangeReadFilter,
       location,
       appId,
       shuffleId,
-      location.getFileName,
       userIdentifier,
       partitionType,
       partitionSplitEnabled)
+
     val writer =
       try {
         partitionType match {
           case PartitionType.MAP => new MapPartitionDataWriter(
               this,
-              diskFileInfo,
-              flusher,
               workerSource,
               conf,
               deviceMonitor,
-              splitThreshold,
-              splitMode,
-              rangeReadFilter,
-              shuffleKey)
+              partitionDataWriterContext)
           case PartitionType.REDUCE => new ReducePartitionDataWriter(
               this,
-              diskFileInfo,
-              flusher,
               workerSource,
               conf,
               deviceMonitor,
-              splitThreshold,
-              splitMode,
-              rangeReadFilter,
-              shuffleKey)
+              partitionDataWriterContext)
           case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
         }
       } catch {
         case e: Exception =>
           logError("Create partition data writer failed", e)
           throw e
       }
-    if (!(writer.getDiskFileInfo.isHdfs)) {
-      deviceMonitor.registerFileWriter(writer)
-      workingDirWriters.computeIfAbsent(workingDir, 
workingDirWriterListFunc).put(
-        diskFileInfo.getFilePath,
-        writer)
-    } else {
-      hdfsWriters.put(diskFileInfo.getFilePath, writer)
-    }
     writer
   }
 
-  def getDiskFileInfo(shuffleKey: String, fileName: String): DiskFileInfo = {
-    val shuffleMap = diskFileInfos.get(shuffleKey)
-    if (shuffleMap ne null) {
-      shuffleMap.get(fileName)
-    } else {
-      null
+  def registerMemoryPartitionWriter(writer: PartitionDataWriter, fileInfo: 
MemoryFileInfo): Unit = {
+    memoryWriters.put(fileInfo, writer)
+  }
+
+  def unregisterMemoryPartitionWriterAndFileInfo(
+      fileInfo: MemoryFileInfo,
+      shuffleKey: String,
+      fileName: String): Unit = {
+    memoryWriters.remove(fileInfo)
+    val map = memoryFileInfos.get(shuffleKey)
+    if (map != null) {
+      map.remove(fileName)
+    }
+  }
+
+  def registerDiskFilePartitionWriter(
+      writer: PartitionDataWriter,
+      workingDir: File,
+      fileInfo: DiskFileInfo): Unit = {
+    if (writer.getDiskFileInfo.isHdfs) {
+      hdfsWriters.put(fileInfo.getFilePath, writer)
+      return
+    }
+    deviceMonitor.registerFileWriter(writer)
+    workingDirWriters.computeIfAbsent(workingDir, 
workingDirWriterListFunc).put(
+      fileInfo.getFilePath,
+      writer)
+  }
+
+  def getFileInfo(
+      shuffleKey: String,
+      fileName: String,
+      read: Boolean = false): FileInfo = {
+    val memoryShuffleMap = memoryFileInfos.get(shuffleKey)
+    if (memoryShuffleMap != null) {
+      val memoryFileInfo = memoryShuffleMap.get(fileName)
+      if (memoryFileInfo != null) {
+        memoryFileInfo.synchronized {
+          if (!memoryFileInfo.getEvicted) {

Review Comment:
   Consider this:
   1. At time 1, reader count is 0, and evict is triggered
   2. At time 2, inside `evict()`, `memoryFileInfo.hasReader()` returns false 
and goes to `evictInternal`, but `memoryFileInfo.setEvicted` is not invoked yet
   3. At time 3, `handleOpenStreamInternal` is invoked, inside `getFileInfo`, 
`memoryFileInfo.getEvicted` returns false, and returns `memoryFileInfo`
   
   So, the `read` and `getEvicted` can't protect data race.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -54,69 +62,110 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   private static final Logger logger = 
LoggerFactory.getLogger(PartitionDataWriter.class);
   private static final long WAIT_INTERVAL_MS = 5;
 
-  protected final DiskFileInfo diskFileInfo;
+  // After commit file, there will be only 1 fileinfo left.
+  protected DiskFileInfo diskFileInfo = null;
+  protected MemoryFileInfo memoryFileInfo = null;
   private FileChannel channel;
   private volatile boolean closed;
   private volatile boolean destroyed;
 
   protected final AtomicInteger numPendingWrites = new AtomicInteger();
 
-  public final Flusher flusher;
-  private final int flushWorkerIndex;
+  public Flusher flusher;
+  private int flushWorkerIndex;
 
   @GuardedBy("flushLock")
-  private CompositeByteBuf flushBuffer;
+  protected CompositeByteBuf flushBuffer;
 
-  private final Object flushLock = new Object();
+  protected final Object flushLock = new Object();
   private final long writerCloseTimeoutMs;
 
-  protected final long flusherBufferSize;
+  protected long flusherBufferSize;
 
   protected final DeviceMonitor deviceMonitor;
   protected final AbstractSource source; // metrics
 
-  private long splitThreshold = 0;
+  private final long splitThreshold;
   private final PartitionSplitMode splitMode;
-  private final PartitionType partitionType;
   private final boolean rangeReadFilter;
   protected boolean deleted = false;
   private RoaringBitmap mapIdBitMap = null;
   protected final FlushNotifier notifier = new FlushNotifier();
   // It's only needed when graceful shutdown is enabled
-  private String shuffleKey;
-  private final StorageManager storageManager;
+  private final String shuffleKey;
+  protected final StorageManager storageManager;
   private final boolean workerGracefulShutdown;
+  protected final long memoryFileStorageMaxFileSize;
+  protected AtomicBoolean isMemoryShuffleFile = new AtomicBoolean();
+  protected final String filename;
+  protected PooledByteBufAllocator pooledByteBufAllocator;
+  private final int workerPushMaxComponents;

Review Comment:
   unused variable



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -297,15 +463,39 @@ protected synchronized long close(
       finalClose.run();
 
       // unregister from DeviceMonitor
-      if (!diskFileInfo.isHdfs()) {
+      if (diskFileInfo != null && !diskFileInfo.isHdfs()) {
         logger.debug("file info {} unregister from device monitor", 
diskFileInfo);
         deviceMonitor.unregisterFileWriter(this);
       }
     }
     if (workerGracefulShutdown) {
-      storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), 
diskFileInfo);
+      if (diskFileInfo != null) {
+        storageManager.notifyFileInfoCommitted(shuffleKey, 
getFile().getName(), diskFileInfo);
+      }
+    }
+    if (diskFileInfo != null) {
+      return diskFileInfo.getFileLength();
+    } else {
+      return memoryFileInfo.getFileLength();
+    }
+  }
+
+  public synchronized void evict() throws IOException {
+    if (memoryFileInfo != null) {
+      if (memoryFileInfo.hasReader()) {
+        return;
+      }
+      if (isClosed()) {
+        PartitionFilesSorter.sortMemoryShuffleFile(memoryFileInfo);

Review Comment:
   Why always sort?



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -260,13 +412,24 @@ public boolean isClosed() {
     return closed;
   }
 
+  public Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFile(
+      PartitionDataWriterContext writerContext) {
+    writerContext.setCanUseMemory(true);
+    return storageManager.createFile(writerContext);
+  }
+
   protected synchronized long close(
       RunnableWithIOException tryClose,
       RunnableWithIOException streamClose,
       RunnableWithIOException finalClose)
       throws IOException {
     if (closed) {
-      String msg = "FileWriter has already closed! fileName " + 
diskFileInfo.getFilePath();
+      String msg = "PartitionDataWriter has already closed! Filename: ";

Review Comment:
   Better to print storage type in the msg



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {
     synchronized (flushLock) {
       // flushBuffer == null here means writer already closed
       if (flushBuffer != null) {
         int numBytes = flushBuffer.readableBytes();
         if (numBytes != 0) {
           notifier.checkException();
-          notifier.numPendingFlushes.incrementAndGet();
           FlushTask task = null;
-          if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
-          } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+          if (evict) {
+            notifier.numPendingFlushes.incrementAndGet();
+            // flush task will release the buffer of memory shuffle file
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, false);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, false);
+            }
+            MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+            MemoryManager.instance().incrementDiskBuffer(numBytes);
+            // read flush buffer to generate correct chunk offsets
+            ByteBuf dupBuf = null;
+            if (memoryFileInfo.getSortedBuffer() != null) {
+              dupBuf = memoryFileInfo.getSortedBuffer();
+            } else {
+              dupBuf = flushBuffer.duplicate();
+            }
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);
+            while (dupBuf.isReadable()) {
+              headerBuf.rewind();
+              dupBuf.readBytes(headerBuf);
+              byte[] batchHeader = headerBuf.array();
+              int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+              dupBuf.skipBytes(compressedSize);
+              diskFileInfo.updateBytesFlushed(compressedSize + 16);
+            }
+          } else {
+            if (!isMemoryShuffleFile.get()) {
+              notifier.numPendingFlushes.incrementAndGet();
+              if (channel != null) {
+                task = new LocalFlushTask(flushBuffer, channel, notifier, 
true);
+              } else if (diskFileInfo.isHdfs()) {
+                task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, true);
+              }
+            }
           }
-          addTask(task);
-          flushBuffer = null;
-          diskFileInfo.updateBytesFlushed(numBytes);
-          if (!finalFlush) {
-            takeBuffer();
+          if (task != null) {
+            addTask(task);
+            flushBuffer = null;
+            if (!evict) {
+              diskFileInfo.updateBytesFlushed(numBytes);
+            }
+            if (!finalFlush) {
+              takeBuffer();
+            }
           }
         }
       }
     }
   }
 
+  @VisibleForTesting
+  public void flush(boolean finalFlush) throws IOException {
+    flushInternal(finalFlush, false);
+  }
+
+  public boolean needHardSplitForMemoryShuffleStorage() {
+    if (!isMemoryShuffleFile.get()) {
+      return false;
+    } else {
+      return !StorageInfo.localDiskAvailable(storageManager.activeTypes())
+          && !StorageInfo.HDFSAvailable(storageManager.activeTypes())
+          && (memoryFileInfo.getFileLength() > memoryFileStorageMaxFileSize
+              || !MemoryManager.instance().memoryFileStorageAvailable());
+    }
+  }
+
   /** assume data size is less than chunk capacity */
   public void write(ByteBuf data) throws IOException {
     if (closed) {
-      String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+      String msg = "PartitionDataWriter has already closed! Filename: ";

Review Comment:
   ditto



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -297,15 +463,39 @@ protected synchronized long close(
       finalClose.run();
 
       // unregister from DeviceMonitor
-      if (!diskFileInfo.isHdfs()) {
+      if (diskFileInfo != null && !diskFileInfo.isHdfs()) {
         logger.debug("file info {} unregister from device monitor", 
diskFileInfo);
         deviceMonitor.unregisterFileWriter(this);
       }
     }
     if (workerGracefulShutdown) {
-      storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), 
diskFileInfo);
+      if (diskFileInfo != null) {
+        storageManager.notifyFileInfoCommitted(shuffleKey, 
getFile().getName(), diskFileInfo);
+      }
+    }
+    if (diskFileInfo != null) {
+      return diskFileInfo.getFileLength();
+    } else {
+      return memoryFileInfo.getFileLength();
+    }
+  }
+
+  public synchronized void evict() throws IOException {

Review Comment:
   Threads for `PartitionDataWriter` and `MemoryManager` can concurrently call 
evict. For example:
   1. At time 1, `MemoryManager` adds PartitionWriter 1 into the list
   2. At time 2, `PartitionDataWriter` finds it exceeds threshold and calls 
`evict`
   3. At time 3, `MemoryManager` takes from the list can call `evict` again



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {
     synchronized (flushLock) {
       // flushBuffer == null here means writer already closed
       if (flushBuffer != null) {
         int numBytes = flushBuffer.readableBytes();
         if (numBytes != 0) {
           notifier.checkException();
-          notifier.numPendingFlushes.incrementAndGet();
           FlushTask task = null;
-          if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
-          } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+          if (evict) {
+            notifier.numPendingFlushes.incrementAndGet();
+            // flush task will release the buffer of memory shuffle file
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, false);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, false);
+            }
+            MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+            MemoryManager.instance().incrementDiskBuffer(numBytes);
+            // read flush buffer to generate correct chunk offsets
+            ByteBuf dupBuf = null;
+            if (memoryFileInfo.getSortedBuffer() != null) {
+              dupBuf = memoryFileInfo.getSortedBuffer();
+            } else {
+              dupBuf = flushBuffer.duplicate();
+            }
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);
+            while (dupBuf.isReadable()) {
+              headerBuf.rewind();
+              dupBuf.readBytes(headerBuf);
+              byte[] batchHeader = headerBuf.array();
+              int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+              dupBuf.skipBytes(compressedSize);
+              diskFileInfo.updateBytesFlushed(compressedSize + 16);
+            }
+          } else {
+            if (!isMemoryShuffleFile.get()) {
+              notifier.numPendingFlushes.incrementAndGet();
+              if (channel != null) {
+                task = new LocalFlushTask(flushBuffer, channel, notifier, 
true);
+              } else if (diskFileInfo.isHdfs()) {
+                task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, true);
+              }
+            }
           }
-          addTask(task);
-          flushBuffer = null;
-          diskFileInfo.updateBytesFlushed(numBytes);
-          if (!finalFlush) {
-            takeBuffer();
+          if (task != null) {
+            addTask(task);
+            flushBuffer = null;
+            if (!evict) {
+              diskFileInfo.updateBytesFlushed(numBytes);
+            }
+            if (!finalFlush) {
+              takeBuffer();
+            }
           }
         }
       }
     }
   }
 
+  @VisibleForTesting
+  public void flush(boolean finalFlush) throws IOException {
+    flushInternal(finalFlush, false);
+  }
+
+  public boolean needHardSplitForMemoryShuffleStorage() {
+    if (!isMemoryShuffleFile.get()) {
+      return false;
+    } else {
+      return !StorageInfo.localDiskAvailable(storageManager.activeTypes())

Review Comment:
   Better to make 
`StorageInfo.localDiskAvailable(storageManager.activeTypes())` and 
`StorageInfo.HDFSAvailable(storageManager.activeTypes())` local variables.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -203,49 +301,103 @@ public void write(ByteBuf data) throws IOException {
     }
 
     final int numBytes = data.readableBytes();
-    MemoryManager.instance().incrementDiskBuffer(numBytes);
-
-    Optional.ofNullable(CongestionController.instance())
-        .ifPresent(
-            congestionController ->
-                
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    if (isMemoryShuffleFile.get()) {
+      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
+    } else {
+      MemoryManager.instance().incrementDiskBuffer(numBytes);
+      Optional.ofNullable(CongestionController.instance())
+          .ifPresent(
+              congestionController ->
+                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    }
 
     synchronized (flushLock) {
       if (closed) {
-        String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+        String msg = "PartitionDataWriter has already closed! Filename: ";

Review Comment:
   ditto



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,35 +199,90 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
+  protected void flushInternal(boolean finalFlush, boolean evict) throws 
IOException {
     synchronized (flushLock) {
       // flushBuffer == null here means writer already closed
       if (flushBuffer != null) {
         int numBytes = flushBuffer.readableBytes();
         if (numBytes != 0) {
           notifier.checkException();
-          notifier.numPendingFlushes.incrementAndGet();
           FlushTask task = null;
-          if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
-          } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+          if (evict) {
+            notifier.numPendingFlushes.incrementAndGet();
+            // flush task will release the buffer of memory shuffle file
+            if (channel != null) {
+              task = new LocalFlushTask(flushBuffer, channel, notifier, false);
+            } else if (diskFileInfo.isHdfs()) {
+              task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, false);
+            }
+            MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+            MemoryManager.instance().incrementDiskBuffer(numBytes);
+            // read flush buffer to generate correct chunk offsets
+            ByteBuf dupBuf = null;
+            if (memoryFileInfo.getSortedBuffer() != null) {
+              dupBuf = memoryFileInfo.getSortedBuffer();
+            } else {
+              dupBuf = flushBuffer.duplicate();
+            }
+            ByteBuffer headerBuf = ByteBuffer.allocate(16);
+            while (dupBuf.isReadable()) {
+              headerBuf.rewind();
+              dupBuf.readBytes(headerBuf);
+              byte[] batchHeader = headerBuf.array();
+              int compressedSize = Platform.getInt(batchHeader, 
Platform.BYTE_ARRAY_OFFSET + 12);
+              dupBuf.skipBytes(compressedSize);
+              diskFileInfo.updateBytesFlushed(compressedSize + 16);
+            }
+          } else {
+            if (!isMemoryShuffleFile.get()) {
+              notifier.numPendingFlushes.incrementAndGet();
+              if (channel != null) {
+                task = new LocalFlushTask(flushBuffer, channel, notifier, 
true);
+              } else if (diskFileInfo.isHdfs()) {
+                task = new HdfsFlushTask(flushBuffer, 
diskFileInfo.getHdfsPath(), notifier, true);
+              }
+            }
           }
-          addTask(task);
-          flushBuffer = null;
-          diskFileInfo.updateBytesFlushed(numBytes);
-          if (!finalFlush) {
-            takeBuffer();
+          if (task != null) {
+            addTask(task);
+            flushBuffer = null;
+            if (!evict) {
+              diskFileInfo.updateBytesFlushed(numBytes);

Review Comment:
   Previsouly if `finalFlush` is true, `maybeSetChunkOffsets(true)` will be 
called. Seems this behavior has changed? Please be careful about such changes.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -37,22 +37,31 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, 
DiskStatus, MapFileMeta, ReduceFileMeta, TimeWindow}
+import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, 
DiskStatus, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta, TimeWindow}
 import org.apache.celeborn.common.metrics.source.{AbstractSource, 
ThreadPoolSource}
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
 import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, 
DBProvider}
 import 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
 
 final private[worker] class StorageManager(conf: CelebornConf, workerSource: 
AbstractSource)
   extends ShuffleRecoverHelper with DeviceObserver with Logging with 
MemoryPressureListener {
+  // fileInfos and partitionDataWriters are one to one mapping
   // mount point -> file writer
   val workingDirWriters =
     JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, 
PartitionDataWriter]]()
+  val hdfsWriters = JavaUtils.newConcurrentHashMap[String, 
PartitionDataWriter]()
+  val memoryWriters = JavaUtils.newConcurrentHashMap[MemoryFileInfo, 
PartitionDataWriter]()
+  // include localDiskFileInfos
+  private val diskFileInfos =

Review Comment:
   Please comment about the map structure



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1205,28 +1205,34 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
          |CheckDiskFullAndSplit in
          |diskFull:$diskFull,
          |partitionSplitMinimumSize:$partitionSplitMinimumSize,
-         |splitThreshold:${fileWriter.getSplitThreshold()},
+         |splitThreshold:${fileWriter.getSplitThreshold},
          |fileLength:${fileWriter.getDiskFileInfo.getFileLength}
          |fileName:${fileWriter.getDiskFileInfo.getFilePath}
          |""".stripMargin)
-    if (workerPartitionSplitEnabled && ((diskFull && 
fileWriter.getDiskFileInfo.getFileLength > partitionSplitMinimumSize) ||
-        (isPrimary && fileWriter.getDiskFileInfo.getFileLength > 
fileWriter.getSplitThreshold()))) {
-      if (softSplit != null && fileWriter.getSplitMode == 
PartitionSplitMode.SOFT &&
-        (fileWriter.getDiskFileInfo.getFileLength < 
partitionSplitMaximumSize)) {
-        softSplit.set(true)
-      } else {
-        workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
-        
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
-        logTrace(
-          s"""
-             |CheckDiskFullAndSplit hardSplit
-             |diskFull:$diskFull,
-             |partitionSplitMinimumSize:$partitionSplitMinimumSize,
-             |splitThreshold:${fileWriter.getSplitThreshold()},
-             |fileLength:${fileWriter.getDiskFileInfo.getFileLength},
-             |fileName:${fileWriter.getDiskFileInfo.getFilePath}
-             |""".stripMargin)
-        return true
+    if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
+      return true
+    }
+    val diskFileInfo = fileWriter.getDiskFileInfo
+    if (diskFileInfo != null) {
+      if (workerPartitionSplitEnabled && ((diskFull && 
diskFileInfo.getFileLength > partitionSplitMinimumSize) ||
+          (isPrimary && diskFileInfo.getFileLength > 
fileWriter.getSplitThreshold))) {
+        if (softSplit != null && fileWriter.getSplitMode == 
PartitionSplitMode.SOFT &&
+          (fileWriter.getDiskFileInfo.getFileLength < 
partitionSplitMaximumSize)) {
+          softSplit.set(true)
+        } else {
+          workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
+          
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+          logTrace(
+            s"""
+               |CheckDiskFullAndSplit hardSplit
+               |diskFull:$diskFull,
+               |partitionSplitMinimumSize:$partitionSplitMinimumSize,
+               |splitThreshold:${fileWriter.getSplitThreshold},
+               |fileLength:${fileWriter.getDiskFileInfo.getFileLength},

Review Comment:
   use `diskFileInfo`



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala:
##########
@@ -110,15 +107,23 @@ abstract private[worker] class Flusher(
     buffer
   }
 
-  def returnBuffer(buffer: CompositeByteBuf): Unit = {
-    MemoryManager.instance().releaseDiskBuffer(buffer.readableBytes())
+  def returnBuffer(task: FlushTask): Unit = {
+    val bufferSize = task.buffer.readableBytes()
+    MemoryManager.instance().releaseDiskBuffer(bufferSize)
     Option(CongestionController.instance())
       .foreach(
-        _.consumeBytes(buffer.readableBytes()))
+        _.consumeBytes(bufferSize))
+    returnBuffer(task.buffer, task.keepBuffer)
+  }
+
+  def returnBuffer(buffer: CompositeByteBuf, keepBuffer: Boolean = false): 
Unit = {
     buffer.removeComponents(0, buffer.numComponents())
     buffer.clear()
-
-    bufferQueue.put(buffer)
+    if (keepBuffer) {
+      bufferQueue.put(buffer)
+    } else {
+      buffer.release()

Review Comment:
   Also, since `buffer.removeComponents(0, buffer.numComponents())` removes all 
its components, IMO we can always keep the buffer?



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -203,49 +301,103 @@ public void write(ByteBuf data) throws IOException {
     }
 
     final int numBytes = data.readableBytes();
-    MemoryManager.instance().incrementDiskBuffer(numBytes);
-
-    Optional.ofNullable(CongestionController.instance())
-        .ifPresent(
-            congestionController ->
-                
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    if (isMemoryShuffleFile.get()) {
+      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
+    } else {
+      MemoryManager.instance().incrementDiskBuffer(numBytes);
+      Optional.ofNullable(CongestionController.instance())
+          .ifPresent(
+              congestionController ->
+                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    }
 
     synchronized (flushLock) {
       if (closed) {
-        String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+        String msg = "PartitionDataWriter has already closed! Filename: ";
+        if (isMemoryShuffleFile.get()) {
+          msg += filename;
+        } else {
+          msg += diskFileInfo.getFilePath();
+        }
         logger.warn(msg);
         throw new AlreadyClosedException(msg);
       }
       if (rangeReadFilter) {
         mapIdBitMap.add(mapId);
       }
-      if (flushBuffer.readableBytes() != 0
-          && flushBuffer.readableBytes() + numBytes >= flusherBufferSize) {
-        flush(false);
+      int flushBufferReadableBytes = flushBuffer.readableBytes();
+      if (!isMemoryShuffleFile.get()) {
+        if (flushBufferReadableBytes != 0
+            && flushBufferReadableBytes + numBytes >= flusherBufferSize) {
+          flush(false);
+        }
+      } else {
+        if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
+            && storageManager.localOrHdfsStorageAvailable()) {
+          logger.debug(
+              "{} Evict, memory buffer is  {}",
+              writerContext.getPartitionLocation().getFileName(),
+              flushBufferReadableBytes);
+          evict();
+        }
       }
 
       data.retain();
       flushBuffer.addComponent(true, data);
+      if (isMemoryShuffleFile.get()) {
+        memoryFileInfo.updateBytesFlushed(numBytes);
+      }
     }
 
     numPendingWrites.decrementAndGet();
   }
 
+  public void evictInternal() throws IOException {
+    if (exception != null) {
+      return;
+    }
+    Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
+        storageManager.createFile(writerContext);
+    if (createFileResult._4() != null) {
+      this.diskFileInfo = createFileResult._3();
+      this.flusher = createFileResult._2();
+
+      isMemoryShuffleFile.set(false);
+      initFileChannelsForDiskFile();
+      flushInternal(closed, true);
+
+      memoryFileInfo.setEvicted(
+          () -> {
+            storageManager.unregisterMemoryPartitionWriterAndFileInfo(
+                memoryFileInfo, writerContext.getShuffleKey(), filename);
+            storageManager.evictedFileCount().incrementAndGet();
+          });
+
+      memoryFileInfo = null;
+    } else {
+      exception = new CelebornIOException("PartitionDataWriter create 
disk-related file failed");
+      throw (CelebornIOException) exception;
+    }
+  }
+
   public RoaringBitmap getMapIdBitMap() {
     return mapIdBitMap;
   }
 
   public StorageInfo getStorageInfo() {
-    if (flusher instanceof LocalFlusher) {
-      LocalFlusher localFlusher = (LocalFlusher) flusher;
-      // do not write file path to reduce rpc size
-      return new StorageInfo(localFlusher.diskType(), true, "");
-    } else {
-      if (deleted) {
-        return null;
+    if (diskFileInfo != null) {
+      if (diskFileInfo.isHdfs()) {
+        if (deleted) {
+          return null;
+        } else {
+          return new StorageInfo(StorageInfo.Type.HDFS, true, 
diskFileInfo.getFilePath());
+        }
       } else {
-        return new StorageInfo(StorageInfo.Type.HDFS, true, 
diskFileInfo.getFilePath());
+        return new StorageInfo(((LocalFlusher) flusher).diskType(), true, "");
       }
+    } else {
+      assert memoryFileInfo != null;

Review Comment:
   Use Precondition instead of assert



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala:
##########
@@ -110,15 +107,23 @@ abstract private[worker] class Flusher(
     buffer
   }
 
-  def returnBuffer(buffer: CompositeByteBuf): Unit = {
-    MemoryManager.instance().releaseDiskBuffer(buffer.readableBytes())
+  def returnBuffer(task: FlushTask): Unit = {
+    val bufferSize = task.buffer.readableBytes()
+    MemoryManager.instance().releaseDiskBuffer(bufferSize)
     Option(CongestionController.instance())
       .foreach(
-        _.consumeBytes(buffer.readableBytes()))
+        _.consumeBytes(bufferSize))
+    returnBuffer(task.buffer, task.keepBuffer)
+  }
+
+  def returnBuffer(buffer: CompositeByteBuf, keepBuffer: Boolean = false): 
Unit = {
     buffer.removeComponents(0, buffer.numComponents())
     buffer.clear()
-
-    bufferQueue.put(buffer)
+    if (keepBuffer) {
+      bufferQueue.put(buffer)
+    } else {
+      buffer.release()

Review Comment:
   `buffer.removeComponents(0, buffer.numComponents())` already releases the  
buffer:
   ```
       for (int i = cIndex; i < endIndex; ++i) {
               Component c = components[i];
               if (c.length() > 0) {
                   needsUpdate = true;
               }
               if (lastAccessed == c) {
                   lastAccessed = null;
               }
               c.free();
           }
   ```
   So why call `buffer.release()` again?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -152,14 +161,20 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         Some(new HdfsFlusher(
           workerSource,
           conf.workerHdfsFlusherThreads,
-          byteBufAllocator,
+          storageBufferAllocator,
           conf.workerPushMaxComponents)),
         conf.workerHdfsFlusherThreads)
     } else {
       (None, 0)
     }
 
   def totalFlusherThread: Int = _totalLocalFlusherThread + 
_totalHdfsFlusherThread
+  def activeTypes = conf.availableStorageTypes

Review Comment:
   use val instead of def



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -37,22 +37,31 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, 
DiskStatus, MapFileMeta, ReduceFileMeta, TimeWindow}
+import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, 
DiskStatus, FileInfo, MapFileMeta, MemoryFileInfo, ReduceFileMeta, TimeWindow}
 import org.apache.celeborn.common.metrics.source.{AbstractSource, 
ThreadPoolSource}
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
 import org.apache.celeborn.service.deploy.worker.shuffledb.{DB, DBBackend, 
DBProvider}
 import 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
 
 final private[worker] class StorageManager(conf: CelebornConf, workerSource: 
AbstractSource)
   extends ShuffleRecoverHelper with DeviceObserver with Logging with 
MemoryPressureListener {
+  // fileInfos and partitionDataWriters are one to one mapping
   // mount point -> file writer
   val workingDirWriters =
     JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, 
PartitionDataWriter]]()
+  val hdfsWriters = JavaUtils.newConcurrentHashMap[String, 
PartitionDataWriter]()
+  val memoryWriters = JavaUtils.newConcurrentHashMap[MemoryFileInfo, 
PartitionDataWriter]()
+  // include localDiskFileInfos
+  private val diskFileInfos =
+    JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, 
DiskFileInfo]]()
+  val memoryFileInfos =
+    JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, 
MemoryFileInfo]]()

Review Comment:
   ditto



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1205,28 +1205,34 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
          |CheckDiskFullAndSplit in
          |diskFull:$diskFull,
          |partitionSplitMinimumSize:$partitionSplitMinimumSize,
-         |splitThreshold:${fileWriter.getSplitThreshold()},
+         |splitThreshold:${fileWriter.getSplitThreshold},
          |fileLength:${fileWriter.getDiskFileInfo.getFileLength}
          |fileName:${fileWriter.getDiskFileInfo.getFilePath}
          |""".stripMargin)
-    if (workerPartitionSplitEnabled && ((diskFull && 
fileWriter.getDiskFileInfo.getFileLength > partitionSplitMinimumSize) ||
-        (isPrimary && fileWriter.getDiskFileInfo.getFileLength > 
fileWriter.getSplitThreshold()))) {
-      if (softSplit != null && fileWriter.getSplitMode == 
PartitionSplitMode.SOFT &&
-        (fileWriter.getDiskFileInfo.getFileLength < 
partitionSplitMaximumSize)) {
-        softSplit.set(true)
-      } else {
-        workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
-        
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
-        logTrace(
-          s"""
-             |CheckDiskFullAndSplit hardSplit
-             |diskFull:$diskFull,
-             |partitionSplitMinimumSize:$partitionSplitMinimumSize,
-             |splitThreshold:${fileWriter.getSplitThreshold()},
-             |fileLength:${fileWriter.getDiskFileInfo.getFileLength},
-             |fileName:${fileWriter.getDiskFileInfo.getFilePath}
-             |""".stripMargin)
-        return true
+    if (fileWriter.needHardSplitForMemoryShuffleStorage()) {
+      return true
+    }
+    val diskFileInfo = fileWriter.getDiskFileInfo
+    if (diskFileInfo != null) {
+      if (workerPartitionSplitEnabled && ((diskFull && 
diskFileInfo.getFileLength > partitionSplitMinimumSize) ||
+          (isPrimary && diskFileInfo.getFileLength > 
fileWriter.getSplitThreshold))) {
+        if (softSplit != null && fileWriter.getSplitMode == 
PartitionSplitMode.SOFT &&
+          (fileWriter.getDiskFileInfo.getFileLength < 
partitionSplitMaximumSize)) {
+          softSplit.set(true)
+        } else {
+          workerSource.incCounter(WorkerSource.WRITE_DATA_HARD_SPLIT_COUNT)
+          
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+          logTrace(
+            s"""
+               |CheckDiskFullAndSplit hardSplit
+               |diskFull:$diskFull,
+               |partitionSplitMinimumSize:$partitionSplitMinimumSize,
+               |splitThreshold:${fileWriter.getSplitThreshold},
+               |fileLength:${fileWriter.getDiskFileInfo.getFileLength},
+               |fileName:${fileWriter.getDiskFileInfo.getFilePath}

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to