This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 494c11681 [CELEBORN-778] Rename MemoryManagerStat to ServingState
494c11681 is described below
commit 494c116818aaeb41b39ceb31d081c411809c6c0a
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jul 10 11:17:52 2023 +0800
[CELEBORN-778] Rename MemoryManagerStat to ServingState
### What changes were proposed in this pull request?
Rename
```
enum MemoryManagerStat { resumeAll, pausePushDataAndReplicate,
pausePushDataAndResumeReplicate }
```
to
```
enum ServingState { NONE_PAUSED, PUSH_AND_REPLICATE_PAUSED, PUSH_PAUSED }
```
### Why are the changes needed?
`MemoryManagerStat` indicates the worker serving functionalities, and it's
weird to represent the state using verbs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #1691 from pan3793/CELEBORN-778.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../deploy/worker/memory/ChannelsLimiter.java | 12 +--
.../deploy/worker/memory/MemoryManager.java | 119 +++++++++------------
.../service/deploy/worker/storage/FileWriter.java | 22 ++--
.../service/deploy/worker/storage/Flusher.scala | 9 +-
.../deploy/worker/storage/StorageManager.scala | 2 +-
5 files changed, 68 insertions(+), 96 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
index a5e048a68..050b28166 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ChannelsLimiter.java
@@ -40,8 +40,8 @@ public class ChannelsLimiter extends ChannelDuplexHandler
private final Set<Channel> channels = ConcurrentHashMap.newKeySet();
private final String moduleName;
private final AtomicBoolean isPaused = new AtomicBoolean(false);
- private AtomicInteger needTrimChannels = new AtomicInteger(0);
- private long waitTrimInterval;
+ private final AtomicInteger needTrimChannels = new AtomicInteger(0);
+ private final long waitTrimInterval;
public ChannelsLimiter(String moduleName, CelebornConf conf) {
this.moduleName = moduleName;
@@ -128,7 +128,7 @@ public class ChannelsLimiter extends ChannelDuplexHandler
@Override
public void onPause(String moduleName) {
if (this.moduleName.equals(moduleName)) {
- logger.info(this.moduleName + " channels pause read.");
+ logger.info("{} channels pause read.", this.moduleName);
pauseAllChannels();
}
}
@@ -136,11 +136,11 @@ public class ChannelsLimiter extends ChannelDuplexHandler
@Override
public void onResume(String moduleName) {
if (moduleName.equalsIgnoreCase("all")) {
- logger.info(this.moduleName + " channels resume read.");
+ logger.info("{} channels resume read.", this.moduleName);
resumeAllChannels();
}
if (this.moduleName.equals(moduleName)) {
- logger.info(this.moduleName + " channels resume read.");
+ logger.info("{} channels resume read.", this.moduleName);
resumeAllChannels();
}
}
@@ -150,5 +150,5 @@ public class ChannelsLimiter extends ChannelDuplexHandler
trimCache();
}
- class TrimCache {}
+ static class TrimCache {}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 2e0d914bf..c0524fa3a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -17,8 +17,6 @@
package org.apache.celeborn.service.deploy.worker.memory;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -32,8 +30,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.PlatformDependent;
-import org.apache.commons.lang3.JavaVersion;
-import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +37,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
+import org.apache.celeborn.reflect.DynMethods;
import org.apache.celeborn.service.deploy.worker.storage.CreditStreamManager;
public class MemoryManager {
@@ -62,13 +59,13 @@ public class MemoryManager {
private final ExecutorService actionService =
ThreadUtils.newDaemonSingleThreadExecutor("memory-manager-actor");
- private AtomicBoolean trimInProcess = new AtomicBoolean(false);
+ private final AtomicBoolean trimInProcess = new AtomicBoolean(false);
private final AtomicLong sortMemoryCounter = new AtomicLong(0);
private final AtomicLong diskBufferCounter = new AtomicLong(0);
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
- private MemoryManagerStat memoryManagerStat = MemoryManagerStat.resumeAll;
+ private ServingState servingState = ServingState.NONE_PAUSED;
private boolean underPressure;
// For credit stream
@@ -80,11 +77,9 @@ public class MemoryManager {
private long lastNotifiedTarget = 0;
private final ScheduledExecutorService readBufferTargetUpdateService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
- "memory-mananger-readBufferTarget-updater");
+ "memory-manager-read-buffer-target-updater");
private CreditStreamManager creditStreamManager = null;
- // For memory shuffle storage
- private final AtomicLong memoryShuffleStorageCounter = new AtomicLong(0);
private long memoryShuffleStorageThreshold = 0;
public static MemoryManager initialize(CelebornConf conf) {
@@ -117,28 +112,13 @@ public class MemoryManager {
long readBufferTargetUpdateInterval =
conf.readBufferTargetUpdateInterval();
long readBufferTargetNotifyThreshold =
conf.readBufferTargetNotifyThreshold();
- String[] provider;
- if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_10)) {
- provider = new String[] {"jdk.internal.misc.VM", "maxDirectMemory"};
- } else {
- provider = new String[] {"sun.misc.VM", "maxDirectMemory"};
- }
+ maxDirectorMemory =
+ DynMethods.builder("maxDirectMemory")
+ .impl("jdk.internal.misc.VM") // for Java 10 and above
+ .impl("sun.misc.VM") // for Java 9 and previous
+ .buildStatic()
+ .<Long>invoke();
- Method maxMemMethod;
- String clazz = provider[0];
- String method = provider[1];
- try {
- Class<?> vmClass = Class.forName(clazz);
- maxMemMethod = vmClass.getDeclaredMethod(method);
-
- maxDirectorMemory = (long) maxMemMethod.invoke(null);
- } catch (ClassNotFoundException
- | NoSuchMethodException
- | IllegalAccessException
- | InvocationTargetException ignored) {
- System.out.println("exception " + ignored);
- // Ignore Exception
- }
Preconditions.checkArgument(maxDirectorMemory > 0);
Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio);
Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
@@ -155,14 +135,13 @@ public class MemoryManager {
checkService.scheduleWithFixedDelay(
() -> {
try {
- MemoryManagerStat lastAction = memoryManagerStat;
- memoryManagerStat = currentMemoryAction();
- if (lastAction != memoryManagerStat) {
- logger.info(
- "Memory manager actions transformed {} -> {}", lastAction,
memoryManagerStat);
- if (memoryManagerStat ==
MemoryManagerStat.pausePushDataAndResumeReplicate) {
+ ServingState lastState = servingState;
+ servingState = currentServingState();
+ if (lastState != servingState) {
+ logger.info("Serving state transformed from {} to {}",
lastState, servingState);
+ if (servingState == ServingState.PUSH_PAUSED) {
pausePushDataCounter.increment();
- logger.info("Trigger pausePushDataAndResumeReplicate action");
+ logger.info("Trigger action: PAUSE PUSH, RESUME REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -170,9 +149,9 @@ public class MemoryManager {
memoryPressureListener ->
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
trimAllListeners();
- } else if (memoryManagerStat ==
MemoryManagerStat.pausePushDataAndReplicate) {
+ } else if (servingState ==
ServingState.PUSH_AND_REPLICATE_PAUSED) {
pausePushDataAndReplicateCounter.increment();
- logger.info("Trigger pausePushDataAndReplicate action");
+ logger.info("Trigger action: PAUSE PUSH and REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
@@ -181,13 +160,13 @@ public class MemoryManager {
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
trimAllListeners();
} else {
- logger.info("Trigger resume action");
+ logger.info("Trigger action: RESUME PUSH and REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onResume("all"));
}
} else {
- if (memoryManagerStat != MemoryManagerStat.resumeAll) {
- logger.debug("Trigger trim action");
+ if (servingState != ServingState.NONE_PAUSED) {
+ logger.debug("Trigger action: TRIM");
trimAllListeners();
}
}
@@ -252,9 +231,10 @@ public class MemoryManager {
logger.info(
"Memory tracker initialized with: "
- + "max direct memory: {}, pause pushdata memory: {}, "
- + "pause replication memory: {}, resume memory: {},"
- + "read buffer memory limit: {} target :{} , memory shuffle
storage limit : {}",
+ + "max direct memory: {}, pause push memory: {}, "
+ + "pause replication memory: {}, resume memory: {}, "
+ + "read buffer memory limit: {} target: {}, "
+ + "memory shuffle storage limit: {}",
Utils.bytesToString(maxDirectorMemory),
Utils.bytesToString(pausePushDataThreshold),
Utils.bytesToString(pauseReplicateThreshold),
@@ -264,30 +244,29 @@ public class MemoryManager {
Utils.bytesToString(memoryShuffleStorageThreshold));
}
- public MemoryManagerStat currentMemoryAction() {
+ public ServingState currentServingState() {
long memoryUsage = getMemoryUsage();
boolean pausePushData = memoryUsage > pausePushDataThreshold;
- boolean pauseReplication = memoryUsage > pauseReplicateThreshold;
- if (pausePushData) {
+ boolean pauseReplicate = memoryUsage > pauseReplicateThreshold;
+ boolean resume = memoryUsage < resumeThreshold;
+ if (pausePushData || pauseReplicate) {
underPressure = true;
- if (pauseReplication) {
- return MemoryManagerStat.pausePushDataAndReplicate;
- } else {
- return MemoryManagerStat.pausePushDataAndResumeReplicate;
- }
- } else {
- boolean resume = memoryUsage < resumeThreshold;
- if (resume) {
- underPressure = false;
- return MemoryManagerStat.resumeAll;
- } else {
- if (underPressure) {
- return MemoryManagerStat.pausePushDataAndResumeReplicate;
- } else {
- return MemoryManagerStat.resumeAll;
- }
- }
+ } else if (resume) {
+ underPressure = false;
+ }
+ if (pausePushData && pauseReplicate) {
+ return ServingState.PUSH_AND_REPLICATE_PAUSED;
+ }
+ if (pausePushData) {
+ return ServingState.PUSH_PAUSED;
+ }
+ if (resume) {
+ return ServingState.NONE_PAUSED;
+ }
+ if (underPressure) {
+ return ServingState.PUSH_PAUSED;
}
+ return ServingState.NONE_PAUSED;
}
public void trimAllListeners() {
@@ -307,7 +286,7 @@ public class MemoryManager {
}
public boolean sortMemoryReady() {
- return (currentMemoryAction().equals(MemoryManagerStat.resumeAll))
+ return currentServingState() == ServingState.NONE_PAUSED
&& sortMemoryCounter.get() < maxSortMemory;
}
@@ -426,9 +405,9 @@ public class MemoryManager {
void onChange(long newMemoryTarget);
}
- enum MemoryManagerStat {
- resumeAll,
- pausePushDataAndReplicate,
- pausePushDataAndResumeReplicate
+ enum ServingState {
+ NONE_PAUSED,
+ PUSH_AND_REPLICATE_PAUSED,
+ PUSH_PAUSED
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 7e831fb33..c43e3c551 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -66,7 +66,7 @@ public abstract class FileWriter implements DeviceObserver {
@GuardedBy("flushLock")
private CompositeByteBuf flushBuffer;
- private Object flushLock = new Object();
+ private final Object flushLock = new Object();
private final long writerCloseTimeoutMs;
protected final long flusherBufferSize;
@@ -170,11 +170,7 @@ public abstract class FileWriter implements DeviceObserver
{
}
}
- /**
- * assume data size is less than chunk capacity
- *
- * @param data
- */
+ /** assume data size is less than chunk capacity */
public void write(ByteBuf data) throws IOException {
if (closed) {
String msg = "FileWriter has already closed!, fileName " +
fileInfo.getFilePath();
@@ -244,14 +240,14 @@ public abstract class FileWriter implements
DeviceObserver {
public abstract long close() throws IOException;
@FunctionalInterface
- public interface RunnableWithException<R extends IOException> {
- void run() throws R;
+ public interface RunnableWithIOException {
+ void run() throws IOException;
}
protected synchronized long close(
- RunnableWithException tryClose,
- RunnableWithException streamClose,
- RunnableWithException finalClose)
+ RunnableWithIOException tryClose,
+ RunnableWithIOException streamClose,
+ RunnableWithIOException finalClose)
throws IOException {
if (closed) {
String msg = "FileWriter has already closed! fileName " +
fileInfo.getFilePath();
@@ -279,14 +275,14 @@ public abstract class FileWriter implements
DeviceObserver {
streamClose.run();
}
} catch (IOException e) {
- logger.warn("close file writer" + this + "failed", e);
+ logger.warn("close file writer {} failed", this, e);
}
finalClose.run();
// unregister from DeviceMonitor
if (!fileInfo.isHdfs()) {
- logger.debug("file info {} register from device monitor");
+ logger.debug("file info {} register from device monitor", fileInfo);
deviceMonitor.unregisterFileWriter(this);
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index e364869a3..e02dc25b4 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -39,7 +39,7 @@ abstract private[worker] class Flusher(
val workerSource: AbstractSource,
val threadCount: Int,
flushTimeMetric: TimeWindow) extends Logging {
- protected lazy val flusherId = System.identityHashCode(this)
+ protected lazy val flusherId: Int = System.identityHashCode(this)
protected val workingQueues = new
Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
protected val workers = new Array[Thread](threadCount)
@@ -47,7 +47,6 @@ abstract private[worker] class Flusher(
val lastBeginFlushTime: AtomicLongArray = new AtomicLongArray(threadCount)
val stopFlag = new AtomicBoolean(false)
- val rand = new Random()
init()
@@ -61,7 +60,7 @@ abstract private[worker] class Flusher(
override def run(): Unit = {
while (!stopFlag.get()) {
val task = workingQueues(index).take()
- val key = s"Flusher-$this-${rand.nextInt()}"
+ val key = s"Flusher-$this-${Random.nextInt()}"
workerSource.sample(WorkerSource.FLUSH_DATA_TIME, key) {
if (!task.notifier.hasException) {
try {
@@ -178,9 +177,7 @@ private[worker] class LocalFlusher(
obj.asInstanceOf[LocalFlusher].mountPoint.equals(mountPoint)
}
- override def toString(): String = {
- s"LocalFlusher@$flusherId-$mountPoint"
- }
+ override def toString: String = s"LocalFlusher@$flusherId-$mountPoint"
}
final private[worker] class HdfsFlusher(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 97095ede1..641a7b739 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -188,7 +188,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (!checkIfWorkingDirCleaned) {
logWarning(
"Worker still has residual files in the working directory before
registering with Master, " +
- "please refer to the configuration document to increase
celeborn.worker.disk.checkFileClean.maxRetries or " +
+ "please refer to the configuration document to increase " +
s"${CelebornConf.WORKER_CHECK_FILE_CLEAN_TIMEOUT.key}.")
} else {
logInfo("Successfully remove all files under working directory.")