RexXiong commented on code in PR #3018:
URL: https://github.com/apache/celeborn/pull/3018#discussion_r1923213383
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -315,69 +325,68 @@ public ServingState currentServingState() {
}
@VisibleForTesting
- protected void switchServingState() {
+ public void switchServingState() {
ServingState lastState = servingState;
servingState = currentServingState();
- if (lastState == servingState) {
- if (servingState != ServingState.NONE_PAUSED) {
+ logger.info("Serving state transformed from {} to {}", lastState,
servingState);
+ switch (servingState) {
+ case PUSH_PAUSED:
+ if (canResumeByPinnedMemory()) {
+ resumeByPinnedMemory(servingState);
+ } else {
+ pausePushDataCounter.increment();
+ if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
+ logger.info("Trigger action: RESUME REPLICATE");
+ resumeReplicate();
+ } else {
+ logger.info("Trigger action: PAUSE PUSH");
+ pausePushDataStartTime = System.currentTimeMillis();
+ memoryPressureListeners.forEach(
+ memoryPressureListener ->
+
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
+ }
+ }
logger.debug("Trigger action: TRIM");
- trimCounter += 1;
- // force to append pause spent time even we are in pause state
+ trimAllListeners();
if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
Review Comment:
lost `trimCounter+=1`
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java:
##########
@@ -315,69 +325,68 @@ public ServingState currentServingState() {
}
@VisibleForTesting
- protected void switchServingState() {
+ public void switchServingState() {
ServingState lastState = servingState;
servingState = currentServingState();
- if (lastState == servingState) {
- if (servingState != ServingState.NONE_PAUSED) {
+ logger.info("Serving state transformed from {} to {}", lastState,
servingState);
+ switch (servingState) {
+ case PUSH_PAUSED:
+ if (canResumeByPinnedMemory()) {
+ resumeByPinnedMemory(servingState);
+ } else {
+ pausePushDataCounter.increment();
+ if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
+ logger.info("Trigger action: RESUME REPLICATE");
+ resumeReplicate();
+ } else {
+ logger.info("Trigger action: PAUSE PUSH");
+ pausePushDataStartTime = System.currentTimeMillis();
+ memoryPressureListeners.forEach(
+ memoryPressureListener ->
+
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
+ }
+ }
logger.debug("Trigger action: TRIM");
- trimCounter += 1;
- // force to append pause spent time even we are in pause state
+ trimAllListeners();
if (trimCounter >= forceAppendPauseSpentTimeThreshold) {
logger.debug(
"Trigger action: TRIM for {} times, force to append pause spent
time.", trimCounter);
appendPauseSpentTime(servingState);
}
- trimAllListeners();
- }
- return;
- }
- logger.info("Serving state transformed from {} to {}", lastState,
servingState);
- switch (servingState) {
- case PUSH_PAUSED:
- pausePushDataCounter.increment();
- if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
- logger.info("Trigger action: RESUME REPLICATE");
- memoryPressureListeners.forEach(
- memoryPressureListener ->
-
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
- } else if (lastState == ServingState.NONE_PAUSED) {
- logger.info("Trigger action: PAUSE PUSH");
- pausePushDataStartTime = System.currentTimeMillis();
- memoryPressureListeners.forEach(
- memoryPressureListener ->
-
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
- }
- trimAllListeners();
break;
case PUSH_AND_REPLICATE_PAUSED:
- pausePushDataAndReplicateCounter.increment();
- if (lastState == ServingState.NONE_PAUSED) {
+ if (canResumeByPinnedMemory()) {
+ resumeByPinnedMemory(servingState);
+ } else {
+ pausePushDataAndReplicateCounter.increment();
logger.info("Trigger action: PAUSE PUSH");
pausePushDataAndReplicateStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
+ logger.info("Trigger action: PAUSE REPLICATE");
+ memoryPressureListeners.forEach(
+ memoryPressureListener ->
+
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
}
- logger.info("Trigger action: PAUSE REPLICATE");
- memoryPressureListeners.forEach(
- memoryPressureListener ->
-
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
+ logger.debug("Trigger action: TRIM");
trimAllListeners();
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]