waitinfuture commented on code in PR #2081:
URL: 
https://github.com/apache/incubator-celeborn/pull/2081#discussion_r1386523255


##########
common/src/main/java/org/apache/celeborn/common/util/MemCacheManager.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.celeborn.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class MemCacheManager {
+  private static MemCacheManager memCacheManager;
+  private ConcurrentMap<String, ByteBuf> caches = new ConcurrentHashMap<>();
+  private CelebornConf conf;
+  private long maxCacheSize;
+  private boolean cacheEnable;
+  private AtomicLong currentCacheSize = new AtomicLong(0);
+
+  public MemCacheManager(CelebornConf conf) {
+    this.conf = conf;

Review Comment:
   `this.conf` seems to be useless



##########
common/src/main/java/org/apache/celeborn/common/util/MemCacheManager.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.celeborn.common.util;

Review Comment:
   Please add license header for the new added file



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -610,11 +611,25 @@ public void sort() throws InterruptedException {
     }
 
     private void initializeFiles() throws IOException {
+      MemCacheManager memCacheManager = MemCacheManager.getMemCacheManager();

Review Comment:
   We can get `CelebornConf` by calling `conf.getCelebornConf()` here.



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2057,6 +2060,22 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("2g")
 
+  val FILE_CACHE_MAX_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.shuffle.cache.max")
+      .categories("worker")
+      .doc("Max size for shuffle file cache to memory")
+      .version("0.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("10g")
+
+  val FILE_CACHE_ENABLE: ConfigEntry[Boolean] =
+    buildConf("celeborn.shuffle.cache.enable")
+      .categories("worker")
+      .doc("memory cache enable")
+      .version("0.4.0")

Review Comment:
   0.3.2



##########
common/src/main/java/org/apache/celeborn/common/util/MemCacheManager.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.celeborn.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class MemCacheManager {
+  private static MemCacheManager memCacheManager;
+  private ConcurrentMap<String, ByteBuf> caches = new ConcurrentHashMap<>();
+  private CelebornConf conf;
+  private long maxCacheSize;
+  private boolean cacheEnable;
+  private AtomicLong currentCacheSize = new AtomicLong(0);
+
+  public MemCacheManager(CelebornConf conf) {
+    this.conf = conf;
+    maxCacheSize = this.conf.fileCacheMaxSize();
+    cacheEnable = this.conf.fileCacheEnable();
+  }
+
+  public void putCache(String key, ByteBuf cache) {
+    int cacheSize = cache.readableBytes();
+    caches.put(key, cache);
+    currentCacheSize.getAndAdd(cacheSize);
+  }
+
+  public boolean canCache(int cacheSize) {
+    return cacheEnable && (maxCacheSize > currentCacheSize.get() + cacheSize);
+  }
+
+  public void removeCache(String key) {
+    ByteBuf cache = caches.remove(key);

Review Comment:
   Seems need to release the cache to avoid netty memory leak.



##########
common/src/main/java/org/apache/celeborn/common/util/MemCacheManager.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.celeborn.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class MemCacheManager {
+  private static MemCacheManager memCacheManager;
+  private ConcurrentMap<String, ByteBuf> caches = new ConcurrentHashMap<>();
+  private CelebornConf conf;
+  private long maxCacheSize;
+  private boolean cacheEnable;
+  private AtomicLong currentCacheSize = new AtomicLong(0);
+
+  public MemCacheManager(CelebornConf conf) {
+    this.conf = conf;
+    maxCacheSize = this.conf.fileCacheMaxSize();
+    cacheEnable = this.conf.fileCacheEnable();
+  }
+
+  public void putCache(String key, ByteBuf cache) {
+    int cacheSize = cache.readableBytes();
+    caches.put(key, cache);
+    currentCacheSize.getAndAdd(cacheSize);
+  }
+
+  public boolean canCache(int cacheSize) {
+    return cacheEnable && (maxCacheSize > currentCacheSize.get() + cacheSize);
+  }
+
+  public void removeCache(String key) {
+    ByteBuf cache = caches.remove(key);
+    currentCacheSize.getAndAdd(-1 * cache.readableBytes());
+  }
+
+  public boolean contains(String key) {
+    if (!cacheEnable) return false;
+    return caches.containsKey(key);
+  }
+
+  public ByteBuf getCache(String key) {
+    return caches.get(key);
+  }
+
+  public static synchronized MemCacheManager getMemCacheManager(CelebornConf 
conf) {
+    if (memCacheManager == null) {

Review Comment:
   ditto



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2057,6 +2060,22 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("2g")
 
+  val FILE_CACHE_MAX_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.shuffle.cache.max")
+      .categories("worker")
+      .doc("Max size for shuffle file cache to memory")
+      .version("0.4.0")

Review Comment:
   0.3.2



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -97,22 +99,20 @@ public void chunkBeingSent(long streamId) {
     StreamState streamState = streams.get(streamId);
     if (streamState != null) {
       streamState.chunksBeingTransferred++;
+      chunksBeingTransferredNum.incrementAndGet();

Review Comment:
   The current logic is referred to Spark's implementation. I'm not sure how 
much it will benefit with this change.



##########
common/src/main/java/org/apache/celeborn/common/util/MemCacheManager.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.celeborn.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public class MemCacheManager {
+  private static MemCacheManager memCacheManager;
+  private ConcurrentMap<String, ByteBuf> caches = new ConcurrentHashMap<>();
+  private CelebornConf conf;
+  private long maxCacheSize;
+  private boolean cacheEnable;
+  private AtomicLong currentCacheSize = new AtomicLong(0);
+
+  public MemCacheManager(CelebornConf conf) {
+    this.conf = conf;
+    maxCacheSize = this.conf.fileCacheMaxSize();
+    cacheEnable = this.conf.fileCacheEnable();
+  }
+
+  public void putCache(String key, ByteBuf cache) {
+    int cacheSize = cache.readableBytes();
+    caches.put(key, cache);
+    currentCacheSize.getAndAdd(cacheSize);
+  }
+
+  public boolean canCache(int cacheSize) {
+    return cacheEnable && (maxCacheSize > currentCacheSize.get() + cacheSize);
+  }
+
+  public void removeCache(String key) {
+    ByteBuf cache = caches.remove(key);
+    currentCacheSize.getAndAdd(-1 * cache.readableBytes());
+  }
+
+  public boolean contains(String key) {
+    if (!cacheEnable) return false;
+    return caches.containsKey(key);
+  }
+
+  public ByteBuf getCache(String key) {
+    return caches.get(key);
+  }
+
+  public static synchronized MemCacheManager getMemCacheManager(CelebornConf 
conf) {
+    if (memCacheManager == null) {
+      memCacheManager = new MemCacheManager(conf);
+    }
+    return memCacheManager;
+  }
+
+  public static synchronized MemCacheManager getMemCacheManager() {

Review Comment:
   nit: recommend to write this way
   ```
     public static MemCacheManager getMemCacheManager(CelebornConf conf) {
       if (memCacheManager == null) {
         synchronized (MemCacheManager.class) {
           if (memCacheManager == null) {
             memCacheManager = new MemCacheManager(conf);
           }
         }
       }
       return memCacheManager;
     }
   ```



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java:
##########
@@ -154,19 +155,27 @@ public void decrementPendingWrites() {
 
   @GuardedBy("flushLock")
   protected void flush(boolean finalFlush) throws IOException {
+    MemCacheManager cacheManager = MemCacheManager.getMemCacheManager();

Review Comment:
   We can make `cacheManager` a member of `FileWriter`, and initialize in the 
constructor of `FileWriter`, we can get `CelebornConf` there.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java:
##########
@@ -154,19 +155,27 @@ public void decrementPendingWrites() {
 
   @GuardedBy("flushLock")
   protected void flush(boolean finalFlush) throws IOException {
+    MemCacheManager cacheManager = MemCacheManager.getMemCacheManager();
     // flushBuffer == null here means writer already closed
     if (flushBuffer != null) {
       int numBytes = flushBuffer.readableBytes();
       if (numBytes != 0) {
         notifier.checkException();
-        notifier.numPendingFlushes.incrementAndGet();
-        FlushTask task = null;
-        if (channel != null) {
-          task = new LocalFlushTask(flushBuffer, channel, notifier);
-        } else if (fileInfo.isHdfs()) {
-          task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), 
notifier);
+        if (finalFlush && fileInfo.getFileLength() == 0 && 
cacheManager.canCache(numBytes)) {

Review Comment:
   Multiple thread can call `flush` simutaneously. We need synchronize 
`cacheManager.canCache(numBytes)`. Maybe we can add a method 
`cacheManager#tryCache` which returns true if caches successfully.



##########
common/src/main/java/org/apache/celeborn/common/meta/FileManagedBuffers.java:
##########
@@ -56,6 +66,26 @@ public ManagedBuffer chunk(int chunkIndex, int offset, int 
len) {
     final long chunkLength = offsets[chunkIndex + 1] - chunkOffset;
     assert offset < chunkLength;
     long length = Math.min(chunkLength - offset, len);
+    MemCacheManager memCacheManager = MemCacheManager.getMemCacheManager();
+    chunkTotal++;
+    if (memCacheManager.contains(file.getAbsolutePath())) {
+      String filePath = file.getAbsolutePath();
+      ByteBuf fileBuffer = memCacheManager.getCache(filePath);
+      ByteBuf buffer = fileBuffer.slice((int) (chunkOffset + offset), (int) 
length);

Review Comment:
   Seems we need to call `buffer.retain` here to support retried read. cc 
@RexXiong @FMX 



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2057,6 +2060,22 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("2g")
 
+  val FILE_CACHE_MAX_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.shuffle.cache.max")
+      .categories("worker")
+      .doc("Max size for shuffle file cache to memory")
+      .version("0.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("10g")

Review Comment:
   10g is too big, for some users' environments total direct memory is below 
10g. maybe 512m is better



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2057,6 +2060,22 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("2g")
 
+  val FILE_CACHE_MAX_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.shuffle.cache.max")
+      .categories("worker")
+      .doc("Max size for shuffle file cache to memory")
+      .version("0.4.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("10g")
+
+  val FILE_CACHE_ENABLE: ConfigEntry[Boolean] =
+    buildConf("celeborn.shuffle.cache.enable")
+      .categories("worker")
+      .doc("memory cache enable")
+      .version("0.4.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   Better defaults to false at first



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to