waitinfuture commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1607680640


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -208,19 +223,68 @@ private MemoryManager(CelebornConf conf) {
           TimeUnit.MILLISECONDS);
     }
 
+    this.storageManager = storageManager;
+    if (memoryFileStorageThreshold > 0
+        && storageManager != null
+        && storageManager.localOrHdfsStorageAvailable()) {
+      ScheduledExecutorService memoryFileStorageService =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
+      memoryFileStorageService.scheduleWithFixedDelay(
+          () -> {
+            try {
+              if ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                  || currentServingState() != ServingState.NONE_PAUSED) {
+                List<PartitionDataWriter> memoryWriters =
+                    new ArrayList<>(storageManager.memoryWriters().values());
+                if (memoryWriters.isEmpty()) {
+                  return;
+                }
+                logger.info("Start evicting {} memory file infos", 
memoryWriters.size());
+                // always evict the largest memory file info first
+                memoryWriters.sort(
+                    (o1, o2) ->
+                        o1.getMemoryFileInfo().getFileLength()

Review Comment:
   This will put larger ones behind



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -85,11 +88,18 @@ public class MemoryManager {
           "worker-memory-manager-read-buffer-target-updater");
   private CreditStreamManager creditStreamManager = null;
 
-  private long memoryShuffleStorageThreshold = 0;
+  private long memoryFileStorageThreshold;
+  private final AtomicLong memoryFileStorageCounter = new AtomicLong();

Review Comment:
   Better to make it a LongAdder



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -208,19 +223,68 @@ private MemoryManager(CelebornConf conf) {
           TimeUnit.MILLISECONDS);
     }
 
+    this.storageManager = storageManager;
+    if (memoryFileStorageThreshold > 0
+        && storageManager != null
+        && storageManager.localOrHdfsStorageAvailable()) {
+      ScheduledExecutorService memoryFileStorageService =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
+      memoryFileStorageService.scheduleWithFixedDelay(
+          () -> {
+            try {
+              if ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)
+                  || currentServingState() != ServingState.NONE_PAUSED) {
+                List<PartitionDataWriter> memoryWriters =
+                    new ArrayList<>(storageManager.memoryWriters().values());
+                if (memoryWriters.isEmpty()) {
+                  return;
+                }
+                logger.info("Start evicting {} memory file infos", 
memoryWriters.size());
+                // always evict the largest memory file info first
+                memoryWriters.sort(
+                    (o1, o2) ->
+                        o1.getMemoryFileInfo().getFileLength()
+                                > o2.getMemoryFileInfo().getFileLength()
+                            ? 1
+                            : 0);
+                try {
+                  for (PartitionDataWriter writer : memoryWriters) {
+                    // this branch means that there is no memory pressure
+                    if ((memoryFileStorageCounter.get() < 
memoryFileStorageThreshold)

Review Comment:
   As commented above, change to
   ```
   (memoryFileStorageCounter.get() < 0.5 * memoryFileStorageThreshold)
                           || currentServingState() == ServingState.NONE_PAUSED
   ```



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -208,19 +223,68 @@ private MemoryManager(CelebornConf conf) {
           TimeUnit.MILLISECONDS);
     }
 
+    this.storageManager = storageManager;
+    if (memoryFileStorageThreshold > 0
+        && storageManager != null
+        && storageManager.localOrHdfsStorageAvailable()) {
+      ScheduledExecutorService memoryFileStorageService =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
+      memoryFileStorageService.scheduleWithFixedDelay(
+          () -> {
+            try {
+              if ((memoryFileStorageCounter.get() >= 
memoryFileStorageThreshold)

Review Comment:
   It may cause all memory writers to flush, because flushing takes time and 
during the time the used netty memory does not decrease. Better to change to:
   ```
   (memoryFileStorageCounter.get() >= 0.5 * memoryFileStorageThreshold)
                     && currentServingState() != ServingState.NONE_PAUSED)
   ```



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -57,28 +59,14 @@ public final class MapPartitionDataWriter extends 
PartitionDataWriter {
 
   public MapPartitionDataWriter(
       StorageManager storageManager,
-      DiskFileInfo diskFileInfo,
-      Flusher flusher,
       AbstractSource workerSource,
       CelebornConf conf,
       DeviceMonitor deviceMonitor,
-      long splitThreshold,
-      PartitionSplitMode splitMode,
-      boolean rangeReadFilter,
-      String shuffleKey)
+      PartitionDataWriterContext writerContext)
       throws IOException {
-    super(
-        storageManager,
-        diskFileInfo,
-        flusher,
-        workerSource,
-        conf,
-        deviceMonitor,
-        splitThreshold,
-        splitMode,
-        PartitionType.MAP,
-        rangeReadFilter,
-        shuffleKey);
+    super(storageManager, workerSource, conf, deviceMonitor, writerContext, 
false);
+
+    assert diskFileInfo != null;

Review Comment:
   Preconditions.checkState



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,23 +194,59 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
-    synchronized (flushLock) {
-      // flushBuffer == null here means writer already closed
-      if (flushBuffer != null) {
-        int numBytes = flushBuffer.readableBytes();
-        if (numBytes != 0) {
-          notifier.checkException();
+  @VisibleForTesting
+  public void flush(boolean finalFlush, boolean evict) throws IOException {
+    // flushBuffer == null here means writer already closed
+    if (flushBuffer != null) {
+      int numBytes = flushBuffer.readableBytes();
+      if (numBytes != 0) {
+        notifier.checkException();
+        FlushTask task = null;
+        if (evict) {
           notifier.numPendingFlushes.incrementAndGet();
-          FlushTask task = null;
+          // duplicate buffer before its released
+          ByteBuf dupBuf = null;
+          if (memoryFileInfo.getSortedBuffer() != null) {
+            dupBuf = memoryFileInfo.getSortedBuffer().retainedDuplicate();
+          } else {
+            dupBuf = flushBuffer.retainedDuplicate();
+          }
+          // flush task will release the buffer of memory shuffle file
           if (channel != null) {
-            task = new LocalFlushTask(flushBuffer, channel, notifier);
+            task = new LocalFlushTask(flushBuffer, channel, notifier, false);
           } else if (diskFileInfo.isHdfs()) {
-            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier);
+            task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(), 
notifier, false);
+          }
+          MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+          MemoryManager.instance().incrementDiskBuffer(numBytes);
+          // read flush buffer to generate correct chunk offsets
+          // data header layout (mapId, attemptId, nextBatchId, length)
+          ByteBuffer headerBuf = ByteBuffer.allocate(16);

Review Comment:
   Better to check if `flushBuffer` exceeds chunkSize, if yes, do the following 
logic, else just use numBytes.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -203,49 +287,97 @@ public void write(ByteBuf data) throws IOException {
     }
 
     final int numBytes = data.readableBytes();
-    MemoryManager.instance().incrementDiskBuffer(numBytes);
-
-    Optional.ofNullable(CongestionController.instance())
-        .ifPresent(
-            congestionController ->
-                
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    if (isMemoryShuffleFile.get()) {
+      MemoryManager.instance().increaseMemoryFileStorage(numBytes);
+    } else {
+      MemoryManager.instance().incrementDiskBuffer(numBytes);
+      Optional.ofNullable(CongestionController.instance())
+          .ifPresent(
+              congestionController ->
+                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+    }
 
     synchronized (flushLock) {
       if (closed) {
-        String msg = "FileWriter has already closed!, fileName " + 
diskFileInfo.getFilePath();
+        String msg = getFileAlreadyClosedMsg();
         logger.warn(msg);
         throw new AlreadyClosedException(msg);
       }
       if (rangeReadFilter) {
         mapIdBitMap.add(mapId);
       }
-      if (flushBuffer.readableBytes() != 0
-          && flushBuffer.readableBytes() + numBytes >= flusherBufferSize) {
-        flush(false);
+      int flushBufferReadableBytes = flushBuffer.readableBytes();
+      if (!isMemoryShuffleFile.get()) {
+        if (flushBufferReadableBytes != 0
+            && flushBufferReadableBytes + numBytes >= flusherBufferSize) {
+          flush(false, false);
+        }
+      } else {
+        if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
+            && storageManager.localOrHdfsStorageAvailable()) {
+          logger.debug(
+              "{} Evict, memory buffer is  {}",
+              writerContext.getPartitionLocation().getFileName(),
+              flushBufferReadableBytes);
+          evict();
+        }
       }
 
       data.retain();
       flushBuffer.addComponent(true, data);
+      if (isMemoryShuffleFile.get()) {
+        memoryFileInfo.updateBytesFlushed(numBytes);
+      }
     }
 
     numPendingWrites.decrementAndGet();
   }
 
+  public Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFile(

Review Comment:
   No need to define this method



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala:
##########
@@ -110,15 +107,23 @@ abstract private[worker] class Flusher(
     buffer
   }
 
-  def returnBuffer(buffer: CompositeByteBuf): Unit = {
-    MemoryManager.instance().releaseDiskBuffer(buffer.readableBytes())
+  def returnBuffer(task: FlushTask): Unit = {
+    val bufferSize = task.buffer.readableBytes()
+    MemoryManager.instance().releaseDiskBuffer(bufferSize)

Review Comment:
   I think we don't need this new method, just move the `releaseDiskBuffer` and 
`CongestionController` logic back to the original method.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -168,6 +154,13 @@ public synchronized long close() throws IOException {
         });
   }
 
+  // Map partitions don't support memory storage yet, because flink has its 
own memory tier

Review Comment:
   Remove `, because flink has its own memory tier`, it's not precise



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,23 +194,59 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
-    synchronized (flushLock) {
-      // flushBuffer == null here means writer already closed
-      if (flushBuffer != null) {
-        int numBytes = flushBuffer.readableBytes();
-        if (numBytes != 0) {
-          notifier.checkException();
+  @VisibleForTesting
+  public void flush(boolean finalFlush, boolean evict) throws IOException {

Review Comment:
   ditto, better to change to `fromEvict`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java:
##########
@@ -24,61 +24,36 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.meta.DiskFileInfo;
-import org.apache.celeborn.common.meta.ReduceFileMeta;
+import org.apache.celeborn.common.meta.FileInfo;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
-import org.apache.celeborn.common.protocol.PartitionSplitMode;
-import org.apache.celeborn.common.protocol.PartitionType;
 
 /*
  * reduce partition file writer, it will create chunk index
  */
 public final class ReducePartitionDataWriter extends PartitionDataWriter {
   private static final Logger logger = 
LoggerFactory.getLogger(ReducePartitionDataWriter.class);
 
-  private long nextBoundary;
-  private final long shuffleChunkSize;
-
   public ReducePartitionDataWriter(
       StorageManager storageManager,
-      DiskFileInfo diskFileInfo,
-      Flusher flusher,
       AbstractSource workerSource,
       CelebornConf conf,
       DeviceMonitor deviceMonitor,
-      long splitThreshold,
-      PartitionSplitMode splitMode,
-      boolean rangeReadFilter,
-      String shuffleKey)
+      PartitionDataWriterContext writerContext)
       throws IOException {
-    super(
-        storageManager,
-        diskFileInfo,
-        flusher,
-        workerSource,
-        conf,
-        deviceMonitor,
-        splitThreshold,
-        splitMode,
-        PartitionType.REDUCE,
-        rangeReadFilter,
-        shuffleKey);
-    this.shuffleChunkSize = conf.shuffleChunkSize();
-    this.nextBoundary = this.shuffleChunkSize;
+    super(storageManager, workerSource, conf, deviceMonitor, writerContext, 
true);
   }
 
-  @Override
-  protected void flush(boolean finalFlush) throws IOException {
-    super.flush(finalFlush);
-    maybeSetChunkOffsets(finalFlush);
+  private void updateLastChunkOffset() {
+    FileInfo fileInfo = getCurrentFileInfo();
+    fileInfo.getReduceFileMeta().updateChunkOffset(fileInfo.getFileLength(), 
true);
   }
 
-  private void maybeSetChunkOffsets(boolean forceSet) {
-    long bytesFlushed = diskFileInfo.getFileLength();
-    if (bytesFlushed >= nextBoundary || forceSet) {
-      ((ReduceFileMeta) 
diskFileInfo.getFileMeta()).addChunkOffset(bytesFlushed);
-      nextBoundary = bytesFlushed + shuffleChunkSize;
-    }
+  @Override
+  public void flush(boolean finalFlush, boolean evict) throws IOException {

Review Comment:
   better to rename `evict` to `fromEvict`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -237,7 +301,7 @@ public ServingState currentServingState() {
       return ServingState.PUSH_PAUSED;
     }
     // trigger resume
-    if (memoryUsage < resumeThreshold) {
+    if (workerMemoryUsageRatio() < resumeRatio) {

Review Comment:
   Just use `memoryUsage / (double) (maxDirectMemory)` here to avoid duplicate 
calculation 



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -34,16 +34,16 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED
 import org.apache.celeborn.common.exception.CelebornIOException
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{DiskFileInfo, FileManagedBuffers, 
MapFileMeta, ReduceFileMeta}
-import org.apache.celeborn.common.network.buffer.NioManagedBuffer
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, 
MemoryFileInfo, ReduceFileMeta}
+import org.apache.celeborn.common.network.buffer.{FileChunkBuffers, 
MemoryChunkBuffers, NioManagedBuffer}
 import org.apache.celeborn.common.network.client.{RpcResponseCallback, 
TransportClient}
 import org.apache.celeborn.common.network.protocol._
 import org.apache.celeborn.common.network.server.BaseMessageHandler
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{MessageType, PbBufferStreamEnd, 
PbChunkFetchRequest, PbOpenStream, PbOpenStreamList, PbOpenStreamListResponse, 
PbReadAddCredit, PbStreamHandler, PbStreamHandlerOpt, StreamType}
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.util.{ExceptionUtils, ThreadUtils, Utils}
-import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, 
CreditStreamManager, PartitionFilesSorter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, 
CreditStreamManager, PartitionDataWriter, PartitionFilesSorter, StorageManager}

Review Comment:
   ` PartitionDataWriter` is unused



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -156,23 +194,59 @@ public void decrementPendingWrites() {
     numPendingWrites.decrementAndGet();
   }
 
-  protected void flush(boolean finalFlush) throws IOException {
-    synchronized (flushLock) {
-      // flushBuffer == null here means writer already closed
-      if (flushBuffer != null) {
-        int numBytes = flushBuffer.readableBytes();
-        if (numBytes != 0) {
-          notifier.checkException();
+  @VisibleForTesting
+  public void flush(boolean finalFlush, boolean evict) throws IOException {
+    // flushBuffer == null here means writer already closed
+    if (flushBuffer != null) {
+      int numBytes = flushBuffer.readableBytes();
+      if (numBytes != 0) {
+        notifier.checkException();
+        FlushTask task = null;
+        if (evict) {
           notifier.numPendingFlushes.incrementAndGet();
-          FlushTask task = null;
+          // duplicate buffer before its released
+          ByteBuf dupBuf = null;
+          if (memoryFileInfo.getSortedBuffer() != null) {

Review Comment:
   Since we don't evict committed partition, we can safely use `flushBuffer` 
here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to