This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1d04a2328 [CELEBORN-920] Worker sends its load to Master through
heartbeat
1d04a2328 is described below
commit 1d04a2328988b81276a8ec3e7a5988de488efba7
Author: Keyong Zhou <[email protected]>
AuthorDate: Sat Aug 26 13:58:37 2023 +0800
[CELEBORN-920] Worker sends its load to Master through heartbeat
### What changes were proposed in this pull request?
Adding a flag indicating high load in the worker's heartbeat allows the
master to better schedule the workers
### Why are the changes needed?
In our production environment, there is a node with abnormally high load,
but the master is not aware of this situation. It assigned numerous jobs to
this node, and as a result, the stability of these jobs has been affected.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #1840 from JQ-Cao/920.
Lead-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: caojiaqing <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 1 +
.../org/apache/celeborn/common/CelebornConf.scala | 11 ++++++++
.../common/protocol/message/ControlMessages.scala | 4 +++
docs/configuration/worker.md | 1 +
.../master/clustermeta/AbstractMetaManager.java | 8 +++---
.../master/clustermeta/IMetadataHandler.java | 1 +
.../clustermeta/SingleMasterMetaManager.java | 4 ++-
.../master/clustermeta/ha/HAMasterMetaManager.java | 2 ++
.../deploy/master/clustermeta/ha/MetaHandler.java | 4 ++-
master/src/main/proto/Resource.proto | 1 +
.../celeborn/service/deploy/master/Master.scala | 4 +++
.../clustermeta/DefaultMetaSystemSuiteJ.java | 30 +++++++++++++++++-----
.../ha/RatisMasterStatusSystemSuiteJ.java | 21 +++++++++++++++
.../deploy/worker/memory/MemoryManager.java | 1 -
.../celeborn/service/deploy/worker/Worker.scala | 15 ++++++++++-
.../service/deploy/worker/WorkerSource.scala | 4 +++
16 files changed, 99 insertions(+), 13 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 87a51328a..74b043598 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -151,6 +151,7 @@ message PbHeartbeatFromWorker {
string requestId = 8;
map<string, PbResourceConsumption> userResourceConsumption = 9;
map<string, int64> estimatedAppDiskUsage = 10;
+ bool highWorkload = 11;
}
message PbHeartbeatFromWorkerResponse {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b11d475cb..916d0806b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -654,6 +654,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerPushMaxComponents: Int =
get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
def workerFetchHeartbeatEnabled: Boolean =
get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean =
get(WORKER_PARTITION_SPLIT_ENABLED)
+ def workerActiveConnectionMax: Option[Long] =
get(WORKER_ACTIVE_CONNECTION_MAX)
// //////////////////////////////////////////////////////
// Metrics System //
@@ -2693,6 +2694,16 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val WORKER_ACTIVE_CONNECTION_MAX: OptionalConfigEntry[Long] =
+ buildConf("celeborn.worker.activeConnection.max")
+ .categories("worker")
+ .doc("If the number of active connections on a worker exceeds this
configuration value, " +
+ "the worker will be marked as high-load in the heartbeat report, " +
+ "and the master will not include that node in the response of
RequestSlots.")
+ .version("0.3.1")
+ .longConf
+ .createOptional
+
val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 89ef41a93..18dc2675d 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -113,6 +113,7 @@ object ControlMessages extends Logging {
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
activeShuffleKeys: util.Set[String],
estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
+ highWorkload: Boolean,
override var requestId: String = ZERO_UUID) extends MasterRequestMessage
case class HeartbeatFromWorkerResponse(
@@ -445,6 +446,7 @@ object ControlMessages extends Logging {
userResourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage,
+ highWorkload,
requestId) =>
val pbDisks = disks.map(PbSerDeUtils.toPbDiskInfo).asJava
val pbUserResourceConsumption =
@@ -459,6 +461,7 @@ object ControlMessages extends Logging {
.setReplicatePort(replicatePort)
.addAllActiveShuffleKeys(activeShuffleKeys)
.putAllEstimatedAppDiskUsage(estimatedAppDiskUsage)
+ .setHighWorkload(highWorkload)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload)
@@ -821,6 +824,7 @@ object ControlMessages extends Logging {
userResourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage,
+ pbHeartbeatFromWorker.getHighWorkload,
pbHeartbeatFromWorker.getRequestId)
case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 994838b3f..8b05d5ad7 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -24,6 +24,7 @@ license: |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available
options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
+| celeborn.worker.activeConnection.max | <undefined> | If the number of
active connections on a worker exceeds this configuration value, the worker
will be marked as high-load in the heartbeat report, and the master will not
include that node in the response of RequestSlots. | 0.3.1 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for
read buffer per mount point. | 0.3.0 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close
idle connections. | 0.2.0 |
| celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit
shuffle data files asynchronously. It's recommended to set at least `128` when
`HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 8df57405f..8d7b9376a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -143,7 +143,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
- long time) {
+ long time,
+ boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(
host, rpcPort, pushPort, fetchPort, replicatePort, disks,
userResourceConsumption);
@@ -161,10 +162,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
appDiskUsageMetric.update(estimatedAppDiskUsage);
// If using HDFSONLY mode, workers with empty disks should not be put into
excluded worker list.
- if (!excludedWorkers.contains(worker) && (disks.isEmpty() &&
!conf.hasHDFSStorage())) {
+ if (!excludedWorkers.contains(worker)
+ && ((disks.isEmpty() && !conf.hasHDFSStorage()) || highWorkload)) {
LOG.debug("Worker: {} num total slots is 0, add to excluded list",
worker);
excludedWorkers.add(worker);
- } else if (availableSlots.get() > 0) {
+ } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) &&
!highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 6c4c65a73..a34cb445d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -55,6 +55,7 @@ public interface IMetadataHandler {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time,
+ boolean highWorkload,
String requestId);
void handleRegisterWorker(
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 3d12db8b4..15c0c6d6d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -92,6 +92,7 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time,
+ boolean highWorkload,
String requestId) {
updateWorkerHeartbeatMeta(
host,
@@ -102,7 +103,8 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
disks,
userResourceConsumption,
estimatedAppDiskUsage,
- time);
+ time,
+ highWorkload);
}
@Override
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index f7a10013c..181c6e487 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -197,6 +197,7 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time,
+ boolean highWorkload,
String requestId) {
try {
ratisServer.submitRequest(
@@ -215,6 +216,7 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
.putAllEstimatedAppDiskUsage(estimatedAppDiskUsage)
.setTime(time)
+ .setHighWorkload(highWorkload)
.build())
.build());
} catch (CelebornRuntimeException e) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index d6ab8309c..27ba6d882 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -163,6 +163,7 @@ public class MetaHandler {
estimatedAppDiskUsage.putAll(
request.getWorkerHeartbeatRequest().getEstimatedAppDiskUsageMap());
replicatePort =
request.getWorkerHeartbeatRequest().getReplicatePort();
+ boolean highWorkload =
request.getWorkerHeartbeatRequest().getHighWorkload();
LOG.debug(
"Handle worker heartbeat for {} {} {} {} {} {} {}",
host,
@@ -182,7 +183,8 @@ public class MetaHandler {
diskInfos,
userResourceConsumption,
estimatedAppDiskUsage,
- time);
+ time,
+ highWorkload);
break;
case RegisterWorker:
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index 8f6f62f25..a6fb5d173 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -121,6 +121,7 @@ message WorkerHeartbeatRequest {
required int64 time = 7;
map<string, ResourceConsumption> userResourceConsumption = 8;
map<string, int64> estimatedAppDiskUsage = 9;
+ required bool highWorkload = 10;
}
message RegisterWorkerRequest {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 466e088d1..c3dd27125 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -334,6 +334,7 @@ private[celeborn] class Master(
userResourceConsumption,
activeShuffleKey,
estimatedAppDiskUsage,
+ highWorkload,
requestId) =>
logDebug(s"Received heartbeat from" +
s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with
$disks.")
@@ -350,6 +351,7 @@ private[celeborn] class Master(
userResourceConsumption,
activeShuffleKey,
estimatedAppDiskUsage,
+ highWorkload,
requestId))
case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo],
requestId: String) =>
@@ -432,6 +434,7 @@ private[celeborn] class Master(
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
activeShuffleKeys: util.Set[String],
estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
+ highWorkload: Boolean,
requestId: String): Unit = {
val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
val registered = workersSnapShot.asScala.contains(targetWorker)
@@ -449,6 +452,7 @@ private[celeborn] class Master(
userResourceConsumption,
estimatedAppDiskUsage,
System.currentTimeMillis(),
+ highWorkload,
requestId)
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index aaae7861a..2962ebafd 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -506,6 +506,7 @@ public class DefaultMetaSystemSuiteJ {
userResourceConsumption1,
new HashMap<>(),
1,
+ false,
getNewReqeustId());
Assert.assertEquals(statusSystem.excludedWorkers.size(), 1);
@@ -520,23 +521,40 @@ public class DefaultMetaSystemSuiteJ {
userResourceConsumption2,
new HashMap<>(),
1,
+ false,
getNewReqeustId());
Assert.assertEquals(statusSystem.excludedWorkers.size(), 2);
statusSystem.handleWorkerHeartbeat(
- HOSTNAME1,
- RPCPORT1,
- PUSHPORT1,
- FETCHPORT1,
+ HOSTNAME3,
+ RPCPORT3,
+ PUSHPORT3,
+ FETCHPORT3,
REPLICATEPORT3,
- disks1,
- userResourceConsumption1,
+ disks3,
+ userResourceConsumption3,
new HashMap<>(),
1,
+ false,
getNewReqeustId());
Assert.assertEquals(statusSystem.excludedWorkers.size(), 2);
+
+ statusSystem.handleWorkerHeartbeat(
+ HOSTNAME3,
+ RPCPORT3,
+ PUSHPORT3,
+ FETCHPORT3,
+ REPLICATEPORT3,
+ disks3,
+ userResourceConsumption3,
+ new HashMap<>(),
+ 1,
+ true,
+ getNewReqeustId());
+
+ Assert.assertEquals(statusSystem.excludedWorkers.size(), 3);
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 1d0e2c17b..6ae55d162 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -735,6 +735,7 @@ public class RatisMasterStatusSystemSuiteJ {
userResourceConsumption1,
new HashMap<>(),
1,
+ false,
getNewReqeustId());
Thread.sleep(3000L);
@@ -752,6 +753,7 @@ public class RatisMasterStatusSystemSuiteJ {
userResourceConsumption2,
new HashMap<>(),
1,
+ false,
getNewReqeustId());
Thread.sleep(3000L);
@@ -770,6 +772,7 @@ public class RatisMasterStatusSystemSuiteJ {
userResourceConsumption1,
new HashMap<>(),
1,
+ false,
getNewReqeustId());
Thread.sleep(3000L);
@@ -777,6 +780,24 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+
+ statusSystem.handleWorkerHeartbeat(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ disks1,
+ userResourceConsumption1,
+ new HashMap<>(),
+ 1,
+ true,
+ getNewReqeustId());
+ Thread.sleep(3000L);
+ Assert.assertEquals(2, statusSystem.excludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
}
@Before
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index 8ef2ee19d..cb248dd9b 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -429,7 +429,6 @@ public class MemoryManager {
void onChange(long newMemoryTarget);
}
- @VisibleForTesting
public enum ServingState {
NONE_PAUSED,
PUSH_AND_REPLICATE_PAUSED,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index e5267a75f..c263f2222 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -46,8 +46,10 @@ import org.apache.celeborn.common.util.{CelebornExitKind,
JavaUtils, ShutdownHoo
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
import org.apache.celeborn.server.common.{HttpService, Service}
+import
org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_COUNT
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter,
MemoryManager}
+import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import
org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter,
StorageManager}
private[celeborn] class Worker(
@@ -296,6 +298,16 @@ private[celeborn] class Worker(
memoryManager.getAllocatedReadBuffers
}
+ private def highWorkload: Boolean = {
+ (memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
+ case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true
+ case (ServingState.PUSH_PAUSED, _) => true
+ case (_, Some(activeConnectionMax)) =>
+ workerSource.getCounterCount(ACTIVE_CONNECTION_COUNT) >=
activeConnectionMax
+ case _ => false
+ }
+ }
+
private def heartbeatToMaster(): Unit = {
val activeShuffleKeys = new JHashSet[String]()
val estimatedAppDiskUsage = new JHashMap[String, JLong]()
@@ -322,7 +334,8 @@ private[celeborn] class Worker(
diskInfos,
resourceConsumption,
activeShuffleKeys,
- estimatedAppDiskUsage),
+ estimatedAppDiskUsage,
+ highWorkload),
classOf[HeartbeatFromWorkerResponse])
response.expiredShuffleKeys.asScala.foreach(shuffleKey =>
workerInfo.releaseSlots(shuffleKey))
cleanTaskQueue.put(response.expiredShuffleKeys)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index edcebbb2e..e1f247a08 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -57,6 +57,10 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addTimer(TAKE_BUFFER_TIME)
addTimer(SORT_TIME)
+ def getCounterCount(metricsName: String): Long = {
+ val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName,
Map.empty)
+ namedCounters.get(metricNameWithLabel).counter.getCount
+ }
// start cleaner thread
startCleaner()
}