This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 4f90b26e5 [CELEBORN-778] Rename MemoryManagerStat to ServingState
4f90b26e5 is described below

commit 4f90b26e52baf795a13918f51d4fbb4f1260700f
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 10 11:17:52 2023 +0800

    [CELEBORN-778] Rename MemoryManagerStat to ServingState
    
    ### What changes were proposed in this pull request?
    
    Rename
    
    ```
    enum MemoryManagerStat { resumeAll, pausePushDataAndReplicate, 
pausePushDataAndResumeReplicate }
    ```
    
    to
    
    ```
    enum ServingState { NONE_PAUSED, PUSH_AND_REPLICATE_PAUSED, PUSH_PAUSED }
    ```
    
    ### Why are the changes needed?
    
    `MemoryManagerStat` indicates the worker serving functionalities, and it's 
weird to represent the state using verbs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #1691 from pan3793/CELEBORN-778.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 494c116818aaeb41b39ceb31d081c411809c6c0a)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../deploy/worker/memory/ChannelsLimiter.java      |  12 +--
 .../deploy/worker/memory/MemoryManager.java        | 119 +++++++++------------
 .../service/deploy/worker/storage/FileWriter.java  |  22 ++--
 .../service/deploy/worker/storage/Flusher.scala    |   9 +-
 .../deploy/worker/storage/StorageManager.scala     |   2 +-
 5 files changed, 68 insertions(+), 96 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
index a5e048a68..050b28166 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
@@ -40,8 +40,8 @@ public class ChannelsLimiter extends ChannelDuplexHandler
   private final Set<Channel> channels = ConcurrentHashMap.newKeySet();
   private final String moduleName;
   private final AtomicBoolean isPaused = new AtomicBoolean(false);
-  private AtomicInteger needTrimChannels = new AtomicInteger(0);
-  private long waitTrimInterval;
+  private final AtomicInteger needTrimChannels = new AtomicInteger(0);
+  private final long waitTrimInterval;
 
   public ChannelsLimiter(String moduleName, CelebornConf conf) {
     this.moduleName = moduleName;
@@ -128,7 +128,7 @@ public class ChannelsLimiter extends ChannelDuplexHandler
   @Override
   public void onPause(String moduleName) {
     if (this.moduleName.equals(moduleName)) {
-      logger.info(this.moduleName + " channels pause read.");
+      logger.info("{} channels pause read.", this.moduleName);
       pauseAllChannels();
     }
   }
@@ -136,11 +136,11 @@ public class ChannelsLimiter extends ChannelDuplexHandler
   @Override
   public void onResume(String moduleName) {
     if (moduleName.equalsIgnoreCase("all")) {
-      logger.info(this.moduleName + " channels resume read.");
+      logger.info("{} channels resume read.", this.moduleName);
       resumeAllChannels();
     }
     if (this.moduleName.equals(moduleName)) {
-      logger.info(this.moduleName + " channels resume read.");
+      logger.info("{} channels resume read.", this.moduleName);
       resumeAllChannels();
     }
   }
@@ -150,5 +150,5 @@ public class ChannelsLimiter extends ChannelDuplexHandler
     trimCache();
   }
 
-  class TrimCache {}
+  static class TrimCache {}
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 2e0d914bf..c0524fa3a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -17,8 +17,6 @@
 
 package org.apache.celeborn.service.deploy.worker.memory;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -32,8 +30,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.internal.PlatformDependent;
-import org.apache.commons.lang3.JavaVersion;
-import org.apache.commons.lang3.SystemUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +37,7 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.TransportModuleConstants;
 import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.common.util.Utils;
+import org.apache.celeborn.reflect.DynMethods;
 import org.apache.celeborn.service.deploy.worker.storage.CreditStreamManager;
 
 public class MemoryManager {
@@ -62,13 +59,13 @@ public class MemoryManager {
   private final ExecutorService actionService =
       ThreadUtils.newDaemonSingleThreadExecutor("memory-manager-actor");
 
-  private AtomicBoolean trimInProcess = new AtomicBoolean(false);
+  private final AtomicBoolean trimInProcess = new AtomicBoolean(false);
 
   private final AtomicLong sortMemoryCounter = new AtomicLong(0);
   private final AtomicLong diskBufferCounter = new AtomicLong(0);
   private final LongAdder pausePushDataCounter = new LongAdder();
   private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
-  private MemoryManagerStat memoryManagerStat = MemoryManagerStat.resumeAll;
+  private ServingState servingState = ServingState.NONE_PAUSED;
   private boolean underPressure;
 
   // For credit stream
@@ -80,11 +77,9 @@ public class MemoryManager {
   private long lastNotifiedTarget = 0;
   private final ScheduledExecutorService readBufferTargetUpdateService =
       ThreadUtils.newDaemonSingleThreadScheduledExecutor(
-          "memory-mananger-readBufferTarget-updater");
+          "memory-manager-read-buffer-target-updater");
   private CreditStreamManager creditStreamManager = null;
 
-  // For memory shuffle storage
-  private final AtomicLong memoryShuffleStorageCounter = new AtomicLong(0);
   private long memoryShuffleStorageThreshold = 0;
 
   public static MemoryManager initialize(CelebornConf conf) {
@@ -117,28 +112,13 @@ public class MemoryManager {
     long readBufferTargetUpdateInterval = 
conf.readBufferTargetUpdateInterval();
     long readBufferTargetNotifyThreshold = 
conf.readBufferTargetNotifyThreshold();
 
-    String[] provider;
-    if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_10)) {
-      provider = new String[] {"jdk.internal.misc.VM", "maxDirectMemory"};
-    } else {
-      provider = new String[] {"sun.misc.VM", "maxDirectMemory"};
-    }
+    maxDirectorMemory =
+        DynMethods.builder("maxDirectMemory")
+            .impl("jdk.internal.misc.VM") // for Java 10 and above
+            .impl("sun.misc.VM") // for Java 9 and previous
+            .buildStatic()
+            .<Long>invoke();
 
-    Method maxMemMethod;
-    String clazz = provider[0];
-    String method = provider[1];
-    try {
-      Class<?> vmClass = Class.forName(clazz);
-      maxMemMethod = vmClass.getDeclaredMethod(method);
-
-      maxDirectorMemory = (long) maxMemMethod.invoke(null);
-    } catch (ClassNotFoundException
-        | NoSuchMethodException
-        | IllegalAccessException
-        | InvocationTargetException ignored) {
-      System.out.println("exception " + ignored);
-      // Ignore Exception
-    }
     Preconditions.checkArgument(maxDirectorMemory > 0);
     Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio);
     Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
@@ -155,14 +135,13 @@ public class MemoryManager {
     checkService.scheduleWithFixedDelay(
         () -> {
           try {
-            MemoryManagerStat lastAction = memoryManagerStat;
-            memoryManagerStat = currentMemoryAction();
-            if (lastAction != memoryManagerStat) {
-              logger.info(
-                  "Memory manager actions transformed {} -> {}", lastAction, 
memoryManagerStat);
-              if (memoryManagerStat == 
MemoryManagerStat.pausePushDataAndResumeReplicate) {
+            ServingState lastState = servingState;
+            servingState = currentServingState();
+            if (lastState != servingState) {
+              logger.info("Serving state transformed from {} to {}", 
lastState, servingState);
+              if (servingState == ServingState.PUSH_PAUSED) {
                 pausePushDataCounter.increment();
-                logger.info("Trigger pausePushDataAndResumeReplicate action");
+                logger.info("Trigger action: PAUSE PUSH, RESUME REPLICATE");
                 memoryPressureListeners.forEach(
                     memoryPressureListener ->
                         
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -170,9 +149,9 @@ public class MemoryManager {
                     memoryPressureListener ->
                         
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
                 trimAllListeners();
-              } else if (memoryManagerStat == 
MemoryManagerStat.pausePushDataAndReplicate) {
+              } else if (servingState == 
ServingState.PUSH_AND_REPLICATE_PAUSED) {
                 pausePushDataAndReplicateCounter.increment();
-                logger.info("Trigger pausePushDataAndReplicate action");
+                logger.info("Trigger action: PAUSE PUSH and REPLICATE");
                 memoryPressureListeners.forEach(
                     memoryPressureListener ->
                         
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -181,13 +160,13 @@ public class MemoryManager {
                         
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
                 trimAllListeners();
               } else {
-                logger.info("Trigger resume action");
+                logger.info("Trigger action: RESUME PUSH and REPLICATE");
                 memoryPressureListeners.forEach(
                     memoryPressureListener -> 
memoryPressureListener.onResume("all"));
               }
             } else {
-              if (memoryManagerStat != MemoryManagerStat.resumeAll) {
-                logger.debug("Trigger trim action");
+              if (servingState != ServingState.NONE_PAUSED) {
+                logger.debug("Trigger action: TRIM");
                 trimAllListeners();
               }
             }
@@ -252,9 +231,10 @@ public class MemoryManager {
 
     logger.info(
         "Memory tracker initialized with: "
-            + "max direct memory: {}, pause pushdata memory: {}, "
-            + "pause replication memory: {}, resume memory: {},"
-            + "read buffer memory limit: {} target :{} , memory shuffle 
storage limit : {}",
+            + "max direct memory: {}, pause push memory: {}, "
+            + "pause replication memory: {}, resume memory: {}, "
+            + "read buffer memory limit: {} target: {}, "
+            + "memory shuffle storage limit: {}",
         Utils.bytesToString(maxDirectorMemory),
         Utils.bytesToString(pausePushDataThreshold),
         Utils.bytesToString(pauseReplicateThreshold),
@@ -264,30 +244,29 @@ public class MemoryManager {
         Utils.bytesToString(memoryShuffleStorageThreshold));
   }
 
-  public MemoryManagerStat currentMemoryAction() {
+  public ServingState currentServingState() {
     long memoryUsage = getMemoryUsage();
     boolean pausePushData = memoryUsage > pausePushDataThreshold;
-    boolean pauseReplication = memoryUsage > pauseReplicateThreshold;
-    if (pausePushData) {
+    boolean pauseReplicate = memoryUsage > pauseReplicateThreshold;
+    boolean resume = memoryUsage < resumeThreshold;
+    if (pausePushData || pauseReplicate) {
       underPressure = true;
-      if (pauseReplication) {
-        return MemoryManagerStat.pausePushDataAndReplicate;
-      } else {
-        return MemoryManagerStat.pausePushDataAndResumeReplicate;
-      }
-    } else {
-      boolean resume = memoryUsage < resumeThreshold;
-      if (resume) {
-        underPressure = false;
-        return MemoryManagerStat.resumeAll;
-      } else {
-        if (underPressure) {
-          return MemoryManagerStat.pausePushDataAndResumeReplicate;
-        } else {
-          return MemoryManagerStat.resumeAll;
-        }
-      }
+    } else if (resume) {
+      underPressure = false;
+    }
+    if (pausePushData && pauseReplicate) {
+      return ServingState.PUSH_AND_REPLICATE_PAUSED;
+    }
+    if (pausePushData) {
+      return ServingState.PUSH_PAUSED;
+    }
+    if (resume) {
+      return ServingState.NONE_PAUSED;
+    }
+    if (underPressure) {
+      return ServingState.PUSH_PAUSED;
     }
+    return ServingState.NONE_PAUSED;
   }
 
   public void trimAllListeners() {
@@ -307,7 +286,7 @@ public class MemoryManager {
   }
 
   public boolean sortMemoryReady() {
-    return (currentMemoryAction().equals(MemoryManagerStat.resumeAll))
+    return currentServingState() == ServingState.NONE_PAUSED
         && sortMemoryCounter.get() < maxSortMemory;
   }
 
@@ -426,9 +405,9 @@ public class MemoryManager {
     void onChange(long newMemoryTarget);
   }
 
-  enum MemoryManagerStat {
-    resumeAll,
-    pausePushDataAndReplicate,
-    pausePushDataAndResumeReplicate
+  enum ServingState {
+    NONE_PAUSED,
+    PUSH_AND_REPLICATE_PAUSED,
+    PUSH_PAUSED
   }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 7e831fb33..c43e3c551 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -66,7 +66,7 @@ public abstract class FileWriter implements DeviceObserver {
   @GuardedBy("flushLock")
   private CompositeByteBuf flushBuffer;
 
-  private Object flushLock = new Object();
+  private final Object flushLock = new Object();
   private final long writerCloseTimeoutMs;
 
   protected final long flusherBufferSize;
@@ -170,11 +170,7 @@ public abstract class FileWriter implements DeviceObserver 
{
     }
   }
 
-  /**
-   * assume data size is less than chunk capacity
-   *
-   * @param data
-   */
+  /** assume data size is less than chunk capacity */
   public void write(ByteBuf data) throws IOException {
     if (closed) {
       String msg = "FileWriter has already closed!, fileName " + 
fileInfo.getFilePath();
@@ -244,14 +240,14 @@ public abstract class FileWriter implements 
DeviceObserver {
   public abstract long close() throws IOException;
 
   @FunctionalInterface
-  public interface RunnableWithException<R extends IOException> {
-    void run() throws R;
+  public interface RunnableWithIOException {
+    void run() throws IOException;
   }
 
   protected synchronized long close(
-      RunnableWithException tryClose,
-      RunnableWithException streamClose,
-      RunnableWithException finalClose)
+      RunnableWithIOException tryClose,
+      RunnableWithIOException streamClose,
+      RunnableWithIOException finalClose)
       throws IOException {
     if (closed) {
       String msg = "FileWriter has already closed! fileName " + 
fileInfo.getFilePath();
@@ -279,14 +275,14 @@ public abstract class FileWriter implements 
DeviceObserver {
           streamClose.run();
         }
       } catch (IOException e) {
-        logger.warn("close file writer" + this + "failed", e);
+        logger.warn("close file writer {} failed", this, e);
       }
 
       finalClose.run();
 
       // unregister from DeviceMonitor
       if (!fileInfo.isHdfs()) {
-        logger.debug("file info {} register from device monitor");
+        logger.debug("file info {} register from device monitor", fileInfo);
         deviceMonitor.unregisterFileWriter(this);
       }
     }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index e364869a3..e02dc25b4 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -39,7 +39,7 @@ abstract private[worker] class Flusher(
     val workerSource: AbstractSource,
     val threadCount: Int,
     flushTimeMetric: TimeWindow) extends Logging {
-  protected lazy val flusherId = System.identityHashCode(this)
+  protected lazy val flusherId: Int = System.identityHashCode(this)
   protected val workingQueues = new 
Array[LinkedBlockingQueue[FlushTask]](threadCount)
   protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
   protected val workers = new Array[Thread](threadCount)
@@ -47,7 +47,6 @@ abstract private[worker] class Flusher(
 
   val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
   val stopFlag = new AtomicBoolean(false)
-  val rand = new Random()
 
   init()
 
@@ -61,7 +60,7 @@ abstract private[worker] class Flusher(
         override def run(): Unit = {
           while (!stopFlag.get()) {
             val task = workingQueues(index).take()
-            val key = s"Flusher-$this-${rand.nextInt()}"
+            val key = s"Flusher-$this-${Random.nextInt()}"
             workerSource.sample(WorkerSource.FLUSH_DATA_TIME, key) {
               if (!task.notifier.hasException) {
                 try {
@@ -178,9 +177,7 @@ private[worker] class LocalFlusher(
     obj.asInstanceOf[LocalFlusher].mountPoint.equals(mountPoint)
   }
 
-  override def toString(): String = {
-    s"LocalFlusher@$flusherId-$mountPoint"
-  }
+  override def toString: String = s"LocalFlusher@$flusherId-$mountPoint"
 }
 
 final private[worker] class HdfsFlusher(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 97095ede1..641a7b739 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -188,7 +188,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   if (!checkIfWorkingDirCleaned) {
     logWarning(
       "Worker still has residual files in the working directory before 
registering with Master, " +
-        "please refer to the configuration document to increase 
celeborn.worker.disk.checkFileClean.maxRetries or " +
+        "please refer to the configuration document to increase " +
         s"${CelebornConf.WORKER_CHECK_FILE_CLEAN_TIMEOUT.key}.")
   } else {
     logInfo("Successfully remove all files under working directory.")

Reply via email to