This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d04a2328 [CELEBORN-920] Worker sends its load to Master through 
heartbeat
1d04a2328 is described below

commit 1d04a2328988b81276a8ec3e7a5988de488efba7
Author: Keyong Zhou <[email protected]>
AuthorDate: Sat Aug 26 13:58:37 2023 +0800

    [CELEBORN-920] Worker sends its load to Master through heartbeat
    
    ### What changes were proposed in this pull request?
    
     Adding a flag indicating high load in the worker's heartbeat allows the 
master to better schedule the workers
    
    ### Why are the changes needed?
    
    In our production environment, there is a node with abnormally high load, 
but the master is not aware of this situation. It assigned numerous jobs to 
this node, and as a result, the stability of these jobs has been affected.
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #1840 from JQ-Cao/920.
    
    Lead-authored-by: Keyong Zhou <[email protected]>
    Co-authored-by: caojiaqing <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      |  1 +
 .../org/apache/celeborn/common/CelebornConf.scala  | 11 ++++++++
 .../common/protocol/message/ControlMessages.scala  |  4 +++
 docs/configuration/worker.md                       |  1 +
 .../master/clustermeta/AbstractMetaManager.java    |  8 +++---
 .../master/clustermeta/IMetadataHandler.java       |  1 +
 .../clustermeta/SingleMasterMetaManager.java       |  4 ++-
 .../master/clustermeta/ha/HAMasterMetaManager.java |  2 ++
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  4 ++-
 master/src/main/proto/Resource.proto               |  1 +
 .../celeborn/service/deploy/master/Master.scala    |  4 +++
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 30 +++++++++++++++++-----
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 21 +++++++++++++++
 .../deploy/worker/memory/MemoryManager.java        |  1 -
 .../celeborn/service/deploy/worker/Worker.scala    | 15 ++++++++++-
 .../service/deploy/worker/WorkerSource.scala       |  4 +++
 16 files changed, 99 insertions(+), 13 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 87a51328a..74b043598 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -151,6 +151,7 @@ message PbHeartbeatFromWorker {
   string requestId = 8;
   map<string, PbResourceConsumption> userResourceConsumption = 9;
   map<string, int64> estimatedAppDiskUsage = 10;
+  bool highWorkload = 11;
 }
 
 message PbHeartbeatFromWorkerResponse {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b11d475cb..916d0806b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -654,6 +654,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerPushMaxComponents: Int = 
get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
   def workerFetchHeartbeatEnabled: Boolean = 
get(WORKER_FETCH_HEARTBEAT_ENABLED)
   def workerPartitionSplitEnabled: Boolean = 
get(WORKER_PARTITION_SPLIT_ENABLED)
+  def workerActiveConnectionMax: Option[Long] = 
get(WORKER_ACTIVE_CONNECTION_MAX)
 
   // //////////////////////////////////////////////////////
   //                 Metrics System                      //
@@ -2693,6 +2694,16 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val WORKER_ACTIVE_CONNECTION_MAX: OptionalConfigEntry[Long] =
+    buildConf("celeborn.worker.activeConnection.max")
+      .categories("worker")
+      .doc("If the number of active connections on a worker exceeds this 
configuration value, " +
+        "the worker will be marked as high-load in the heartbeat report, " +
+        "and the master will not include that node in the response of 
RequestSlots.")
+      .version("0.3.1")
+      .longConf
+      .createOptional
+
   val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.client.application.heartbeatInterval")
       .withAlternative("celeborn.application.heartbeatInterval")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 89ef41a93..18dc2675d 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -113,6 +113,7 @@ object ControlMessages extends Logging {
       userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
       activeShuffleKeys: util.Set[String],
       estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
+      highWorkload: Boolean,
       override var requestId: String = ZERO_UUID) extends MasterRequestMessage
 
   case class HeartbeatFromWorkerResponse(
@@ -445,6 +446,7 @@ object ControlMessages extends Logging {
           userResourceConsumption,
           activeShuffleKeys,
           estimatedAppDiskUsage,
+          highWorkload,
           requestId) =>
       val pbDisks = disks.map(PbSerDeUtils.toPbDiskInfo).asJava
       val pbUserResourceConsumption =
@@ -459,6 +461,7 @@ object ControlMessages extends Logging {
         .setReplicatePort(replicatePort)
         .addAllActiveShuffleKeys(activeShuffleKeys)
         .putAllEstimatedAppDiskUsage(estimatedAppDiskUsage)
+        .setHighWorkload(highWorkload)
         .setRequestId(requestId)
         .build().toByteArray
       new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload)
@@ -821,6 +824,7 @@ object ControlMessages extends Logging {
           userResourceConsumption,
           activeShuffleKeys,
           estimatedAppDiskUsage,
+          pbHeartbeatFromWorker.getHighWorkload,
           pbHeartbeatFromWorker.getRequestId)
 
       case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 994838b3f..8b05d5ad7 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -24,6 +24,7 @@ license: |
 | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged 
shuffle data. For example, if a reducer's shuffle data is 128M and the data 
will need 16 fetch chunk requests to fetch. | 0.2.0 | 
 | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available 
options: HDD,SSD,HDFS.  | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
+| celeborn.worker.activeConnection.max | &lt;undefined&gt; | If the number of 
active connections on a worker exceeds this configuration value, the worker 
will be marked as high-load in the heartbeat report, and the master will not 
include that node in the response of RequestSlots. | 0.3.1 | 
 | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for 
read buffer per mount point. | 0.3.0 | 
 | celeborn.worker.closeIdleConnections | false | Whether worker will close 
idle connections. | 0.2.0 | 
 | celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit 
shuffle data files asynchronously. It's recommended to set at least `128` when 
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 8df57405f..8d7b9376a 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -143,7 +143,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       Map<String, DiskInfo> disks,
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
       Map<String, Long> estimatedAppDiskUsage,
-      long time) {
+      long time,
+      boolean highWorkload) {
     WorkerInfo worker =
         new WorkerInfo(
             host, rpcPort, pushPort, fetchPort, replicatePort, disks, 
userResourceConsumption);
@@ -161,10 +162,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
     appDiskUsageMetric.update(estimatedAppDiskUsage);
     // If using HDFSONLY mode, workers with empty disks should not be put into 
excluded worker list.
-    if (!excludedWorkers.contains(worker) && (disks.isEmpty() && 
!conf.hasHDFSStorage())) {
+    if (!excludedWorkers.contains(worker)
+        && ((disks.isEmpty() && !conf.hasHDFSStorage()) || highWorkload)) {
       LOG.debug("Worker: {} num total slots is 0, add to excluded list", 
worker);
       excludedWorkers.add(worker);
-    } else if (availableSlots.get() > 0) {
+    } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) && 
!highWorkload) {
       // only unblack if numSlots larger than 0
       excludedWorkers.remove(worker);
     }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 6c4c65a73..a34cb445d 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -55,6 +55,7 @@ public interface IMetadataHandler {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
       Map<String, Long> estimatedAppDiskUsage,
       long time,
+      boolean highWorkload,
       String requestId);
 
   void handleRegisterWorker(
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 3d12db8b4..15c0c6d6d 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -92,6 +92,7 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
       Map<String, Long> estimatedAppDiskUsage,
       long time,
+      boolean highWorkload,
       String requestId) {
     updateWorkerHeartbeatMeta(
         host,
@@ -102,7 +103,8 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
         disks,
         userResourceConsumption,
         estimatedAppDiskUsage,
-        time);
+        time,
+        highWorkload);
   }
 
   @Override
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index f7a10013c..181c6e487 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -197,6 +197,7 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
       Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
       Map<String, Long> estimatedAppDiskUsage,
       long time,
+      boolean highWorkload,
       String requestId) {
     try {
       ratisServer.submitRequest(
@@ -215,6 +216,7 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
                           
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
                       .putAllEstimatedAppDiskUsage(estimatedAppDiskUsage)
                       .setTime(time)
+                      .setHighWorkload(highWorkload)
                       .build())
               .build());
     } catch (CelebornRuntimeException e) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index d6ab8309c..27ba6d882 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -163,6 +163,7 @@ public class MetaHandler {
           estimatedAppDiskUsage.putAll(
               
request.getWorkerHeartbeatRequest().getEstimatedAppDiskUsageMap());
           replicatePort = 
request.getWorkerHeartbeatRequest().getReplicatePort();
+          boolean highWorkload = 
request.getWorkerHeartbeatRequest().getHighWorkload();
           LOG.debug(
               "Handle worker heartbeat for {} {} {} {} {} {} {}",
               host,
@@ -182,7 +183,8 @@ public class MetaHandler {
               diskInfos,
               userResourceConsumption,
               estimatedAppDiskUsage,
-              time);
+              time,
+              highWorkload);
           break;
 
         case RegisterWorker:
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index 8f6f62f25..a6fb5d173 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -121,6 +121,7 @@ message WorkerHeartbeatRequest {
   required int64 time = 7;
   map<string, ResourceConsumption> userResourceConsumption = 8;
   map<string, int64> estimatedAppDiskUsage = 9;
+  required bool highWorkload = 10;
 }
 
 message RegisterWorkerRequest {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 466e088d1..c3dd27125 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -334,6 +334,7 @@ private[celeborn] class Master(
           userResourceConsumption,
           activeShuffleKey,
           estimatedAppDiskUsage,
+          highWorkload,
           requestId) =>
       logDebug(s"Received heartbeat from" +
         s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with 
$disks.")
@@ -350,6 +351,7 @@ private[celeborn] class Master(
           userResourceConsumption,
           activeShuffleKey,
           estimatedAppDiskUsage,
+          highWorkload,
           requestId))
 
     case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], 
requestId: String) =>
@@ -432,6 +434,7 @@ private[celeborn] class Master(
       userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
       activeShuffleKeys: util.Set[String],
       estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
+      highWorkload: Boolean,
       requestId: String): Unit = {
     val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort)
     val registered = workersSnapShot.asScala.contains(targetWorker)
@@ -449,6 +452,7 @@ private[celeborn] class Master(
         userResourceConsumption,
         estimatedAppDiskUsage,
         System.currentTimeMillis(),
+        highWorkload,
         requestId)
     }
 
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index aaae7861a..2962ebafd 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -506,6 +506,7 @@ public class DefaultMetaSystemSuiteJ {
         userResourceConsumption1,
         new HashMap<>(),
         1,
+        false,
         getNewReqeustId());
 
     Assert.assertEquals(statusSystem.excludedWorkers.size(), 1);
@@ -520,23 +521,40 @@ public class DefaultMetaSystemSuiteJ {
         userResourceConsumption2,
         new HashMap<>(),
         1,
+        false,
         getNewReqeustId());
 
     Assert.assertEquals(statusSystem.excludedWorkers.size(), 2);
 
     statusSystem.handleWorkerHeartbeat(
-        HOSTNAME1,
-        RPCPORT1,
-        PUSHPORT1,
-        FETCHPORT1,
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
         REPLICATEPORT3,
-        disks1,
-        userResourceConsumption1,
+        disks3,
+        userResourceConsumption3,
         new HashMap<>(),
         1,
+        false,
         getNewReqeustId());
 
     Assert.assertEquals(statusSystem.excludedWorkers.size(), 2);
+
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        disks3,
+        userResourceConsumption3,
+        new HashMap<>(),
+        1,
+        true,
+        getNewReqeustId());
+
+    Assert.assertEquals(statusSystem.excludedWorkers.size(), 3);
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 1d0e2c17b..6ae55d162 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -735,6 +735,7 @@ public class RatisMasterStatusSystemSuiteJ {
         userResourceConsumption1,
         new HashMap<>(),
         1,
+        false,
         getNewReqeustId());
     Thread.sleep(3000L);
 
@@ -752,6 +753,7 @@ public class RatisMasterStatusSystemSuiteJ {
         userResourceConsumption2,
         new HashMap<>(),
         1,
+        false,
         getNewReqeustId());
     Thread.sleep(3000L);
 
@@ -770,6 +772,7 @@ public class RatisMasterStatusSystemSuiteJ {
         userResourceConsumption1,
         new HashMap<>(),
         1,
+        false,
         getNewReqeustId());
     Thread.sleep(3000L);
 
@@ -777,6 +780,24 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+
+    statusSystem.handleWorkerHeartbeat(
+        HOSTNAME1,
+        RPCPORT1,
+        PUSHPORT1,
+        FETCHPORT1,
+        REPLICATEPORT1,
+        disks1,
+        userResourceConsumption1,
+        new HashMap<>(),
+        1,
+        true,
+        getNewReqeustId());
+    Thread.sleep(3000L);
+    Assert.assertEquals(2, statusSystem.excludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
   }
 
   @Before
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 8ef2ee19d..cb248dd9b 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -429,7 +429,6 @@ public class MemoryManager {
     void onChange(long newMemoryTarget);
   }
 
-  @VisibleForTesting
   public enum ServingState {
     NONE_PAUSED,
     PUSH_AND_REPLICATE_PAUSED,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index e5267a75f..c263f2222 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -46,8 +46,10 @@ import org.apache.celeborn.common.util.{CelebornExitKind, 
JavaUtils, ShutdownHoo
 // Can Remove this if celeborn don't support scala211 in future
 import org.apache.celeborn.common.util.FunctionConverter._
 import org.apache.celeborn.server.common.{HttpService, Service}
+import 
org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_COUNT
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, 
MemoryManager}
+import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
 import 
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, 
StorageManager}
 
 private[celeborn] class Worker(
@@ -296,6 +298,16 @@ private[celeborn] class Worker(
     memoryManager.getAllocatedReadBuffers
   }
 
+  private def highWorkload: Boolean = {
+    (memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
+      case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true
+      case (ServingState.PUSH_PAUSED, _) => true
+      case (_, Some(activeConnectionMax)) =>
+        workerSource.getCounterCount(ACTIVE_CONNECTION_COUNT) >= 
activeConnectionMax
+      case _ => false
+    }
+  }
+
   private def heartbeatToMaster(): Unit = {
     val activeShuffleKeys = new JHashSet[String]()
     val estimatedAppDiskUsage = new JHashMap[String, JLong]()
@@ -322,7 +334,8 @@ private[celeborn] class Worker(
         diskInfos,
         resourceConsumption,
         activeShuffleKeys,
-        estimatedAppDiskUsage),
+        estimatedAppDiskUsage,
+        highWorkload),
       classOf[HeartbeatFromWorkerResponse])
     response.expiredShuffleKeys.asScala.foreach(shuffleKey => 
workerInfo.releaseSlots(shuffleKey))
     cleanTaskQueue.put(response.expiredShuffleKeys)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index edcebbb2e..e1f247a08 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -57,6 +57,10 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addTimer(TAKE_BUFFER_TIME)
   addTimer(SORT_TIME)
 
+  def getCounterCount(metricsName: String): Long = {
+    val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, 
Map.empty)
+    namedCounters.get(metricNameWithLabel).counter.getCount
+  }
   // start cleaner thread
   startCleaner()
 }

Reply via email to