This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e7e39a51b [CELEBORN-1189] Introduce RunningApplicationCount metric and
/applications API to record running applications of worker
e7e39a51b is described below
commit e7e39a51be735a76afdccb71c1cffbaa5658423d
Author: SteNicholas <[email protected]>
AuthorDate: Wed Dec 27 09:51:16 2023 +0800
[CELEBORN-1189] Introduce RunningApplicationCount metric and /applications
API to record running applications of worker
### What changes were proposed in this pull request?
Introduce `RunningApplicationCount` metric and `/applications` API to
record running applications for Celeborn worker.
### Why are the changes needed?
`RunningApplicationCount` metrics only monitors the count of running
applications in the cluster for master. Meanwhile, `/listTopDiskUsedApps` API
lists the top disk usage application ids for master and worker. Therefore
`RunningApplicationCount` metric and `/applications` API could be introduced to
record running applications of worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes #2172 from SteNicholas/CELEBORN-1189.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
METRICS.md | 2 +-
README.md | 2 +-
.../apache/celeborn/common/meta/DeviceInfo.scala | 41 +++++++++++++++-------
.../apache/celeborn/common/meta/WorkerInfo.scala | 8 +++++
.../celeborn/common/meta/WorkerInfoSuite.scala | 8 ++---
docs/deploy.md | 2 +-
docs/monitoring.md | 2 ++
.../celeborn/server/common/HttpService.scala | 4 +--
.../celeborn/server/common/http/HttpEndpoint.scala | 23 ++++++------
.../celeborn/server/common/http/HttpUtils.scala | 3 +-
.../server/common/http/HttpUtilsSuite.scala | 1 +
.../celeborn/service/deploy/worker/Worker.scala | 12 +++++++
.../service/deploy/worker/WorkerSource.scala | 2 ++
13 files changed, 77 insertions(+), 33 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index ec0ffad10..986042342 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -78,10 +78,10 @@ Here is an example of Grafana dashboard importing.
|:--------------------------------------:|:-----------------:|:---------------------------------------------------------------------------------------------------------------:|
| WorkerCount | master |
The count of active workers.
|
| ExcludedWorkerCount | master |
The count of workers in excluded list.
|
-| RunningApplicationCount | master |
The count of running applications in the cluster.
|
| OfferSlotsTime | master |
The time of offer slots.
|
| PartitionSize | master | The
estimated partition size of last 20 flush window whose length is 15 seconds by
defaults. |
| RegisteredShuffleCount | master and worker |
The value means count of registered shuffle.
|
+| RunningApplicationCount | master and worker |
The value means count of running applications.
|
| ActiveShuffleSize | master and worker | The value
means the active shuffle size for workers or a worker including master replica
and slave replica. |
| ActiveShuffleFileCount | master and worker | The value
means the active shuffle size for workers or a worker including master replica
and slave replica. |
| diskFileCount | master and worker |
The count of disk files consumption by each user.
|
diff --git a/README.md b/README.md
index d424648ca..160180b7f 100644
--- a/README.md
+++ b/README.md
@@ -231,7 +231,7 @@ FetchPort: 37569
ReplicatePort: 37093
SlotsUsed: 0()
LastHeartbeat: 0
-Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0
shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 448284381184,
avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs ,
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0 shuffleAllocations:
Map(), mountPoint: /mnt/disk3, usableSpace: 450755608576, avgFlushTime: 0,
activeSlots: 0) status: HEALTHY dirs , /mnt/disk2=DiskInfo(maxSlots: 6713,
committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/d [...]
+Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace:
448284381184, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs ,
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0, running applications
0, shuffleAllocations: Map(), mountPoint: /mnt/disk3, usableSpace:
450755608576, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs ,
/mnt/disk2=DiskInfo(maxSlots: 6713, committed shuffl [...]
WorkerRef: null
```
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index 990a2b127..e3e547407 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.meta
import java.io.File
import java.util
+import java.util.function.BiFunction
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
@@ -84,6 +85,7 @@ class DiskInfo(
var storageType: StorageInfo.Type = StorageInfo.Type.SSD
var maxSlots: Long = 0
lazy val shuffleAllocations = new util.HashMap[String, Integer]()
+ lazy val applicationAllocations = new util.HashMap[String, Integer]()
def setStorageType(storageType: StorageInfo.Type) = {
this.storageType = storageType
@@ -121,28 +123,38 @@ class DiskInfo(
}
def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
- val allocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
- shuffleAllocations.put(shuffleKey, allocated + slots)
+ val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+ val shuffleAllocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
+ val applicationAllocated =
applicationAllocations.getOrDefault(applicationId, 0)
+ shuffleAllocations.put(shuffleKey, shuffleAllocated + slots)
+ applicationAllocations.put(applicationId, applicationAllocated + slots)
activeSlots = activeSlots + slots
}
def releaseSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
- val allocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
- if (allocated < slots) {
- logError(s"allocated $allocated is less than to release $slots !")
- }
- val delta = Math.min(allocated, slots)
- activeSlots = activeSlots - delta
- if (allocated > slots) {
- shuffleAllocations.put(shuffleKey, allocated - slots)
+ val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+ val shuffleAllocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
+ val applicationAllocated =
applicationAllocations.getOrDefault(applicationId, 0)
+ if (shuffleAllocated < slots) {
+ logError(s"allocated $shuffleAllocated is less than to release $slots !")
} else {
- shuffleAllocations.put(shuffleKey, 0)
+ shuffleAllocations.put(shuffleKey, shuffleAllocated - slots)
+ applicationAllocations.put(applicationId, applicationAllocated - slots)
}
+ activeSlots = activeSlots - Math.min(shuffleAllocated, slots)
}
def releaseSlots(shuffleKey: String): Unit = this.synchronized {
val allocated = shuffleAllocations.remove(shuffleKey)
if (allocated != null) {
+ val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+ var applicationAllocated =
applicationAllocations.getOrDefault(applicationId, 0)
+ applicationAllocated = applicationAllocated - allocated
+ if (applicationAllocated <= 0) {
+ applicationAllocations.remove(applicationId)
+ } else {
+ applicationAllocations.put(applicationId, applicationAllocated)
+ }
activeSlots = activeSlots - allocated
}
}
@@ -151,10 +163,15 @@ class DiskInfo(
new util.HashSet(shuffleAllocations.keySet())
}
+ def getApplicationIdSet(): util.HashSet[String] = this.synchronized {
+ new util.HashSet(applicationAllocations.keySet())
+ }
+
override def toString: String = this.synchronized {
val (emptyShuffles, nonEmptyShuffles) =
shuffleAllocations.asScala.partition(_._2 == 0)
s"DiskInfo(maxSlots: $maxSlots," +
- s" committed shuffles ${emptyShuffles.size}" +
+ s" committed shuffles ${emptyShuffles.size}," +
+ s" running applications ${applicationAllocations.size}," +
s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
s" mountPoint: $mountPoint," +
s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 05ea35d63..20561eebf 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -103,6 +103,14 @@ class WorkerInfo(
shuffleKeySet
}
+ def getApplicationIdSet: util.HashSet[String] = this.synchronized {
+ val applicationIdSet = new util.HashSet[String]()
+ diskInfos.values().asScala.foreach { diskInfo =>
+ applicationIdSet.addAll(diskInfo.getApplicationIdSet())
+ }
+ applicationIdSet
+ }
+
def hasSameInfoWith(other: WorkerInfo): Boolean = {
rpcPort == other.rpcPort &&
pushPort == other.pushPort &&
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index ebd83682e..a435f35e3 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -73,7 +73,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks,
userResourceConsumption)
val allocatedSlots = new AtomicInteger(0)
- val shuffleKey = "appId-shuffleId"
+ val shuffleKey = "appId-1"
val es = ThreadUtils.newDaemonFixedThreadPool(8, "workerInfo-unit-test")
val futures = new ArrayBuffer[Future[_]]()
@@ -284,9 +284,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
- | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB,
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD)
status: HEALTHY dirs $placeholder
- | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB,
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD)
status: HEALTHY dirs $placeholder
- | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB,
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD)
status: HEALTHY dirs $placeholder
+ | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace:
2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30,
storageType: SSD) status: HEALTHY dirs $placeholder
+ | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace:
2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10,
storageType: SSD) status: HEALTHY dirs $placeholder
+ | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace:
2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20,
storageType: SSD) status: HEALTHY dirs $placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption:
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1,
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
|WorkerRef: null
diff --git a/docs/deploy.md b/docs/deploy.md
index c289b57bb..6b3ffc09c 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -151,7 +151,7 @@ FetchPort: 37569
ReplicatePort: 37093
SlotsUsed: 0()
LastHeartbeat: 0
-Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0
shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 448284381184,
avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs ,
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0 shuffleAllocations:
Map(), mountPoint: /mnt/disk3, usableSpace: 450755608576, avgFlushTime: 0,
activeSlots: 0) status: HEALTHY dirs , /mnt/disk2=DiskInfo(maxSlots: 6713,
committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/d [...]
+Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace:
448284381184, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs ,
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0, running applications
0, shuffleAllocations: Map(), mountPoint: /mnt/disk3, usableSpace:
450755608576, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs ,
/mnt/disk2=DiskInfo(maxSlots: 6713, committed shuffl [...]
WorkerRef: null
```
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 64592932f..819658007 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -163,6 +163,7 @@ These metrics are exposed by Celeborn worker.
- TakeBufferTime
- The time for a worker to take out a buffer from a disk flusher.
- RegisteredShuffleCount
+ - RunningApplicationCount
- SlotsAllocated
- NettyMemory
- The total amount of off-heap memory used by celeborn worker.
@@ -330,6 +331,7 @@ API path listed as below:
| /conf | List the conf setting of the worker.
|
| /workerInfo | List the worker information of the worker.
|
| /threadDump | List the current thread dump of the worker.
|
+| /applications | List all running application's ids of the
worker. It only return application ids running in that worker.
|
| /shuffles | List all the running shuffle keys of the
worker. It only return keys of shuffles running in that worker.
|
| /listTopDiskUsedApps | List the top disk usage application ids. It
only return application ids running in that worker.
|
| /listPartitionLocationInfo | List all the living PartitionLocation
information in that worker.
|
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 4918bcc23..b7bdc7d87 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -49,6 +49,8 @@ abstract class HttpService extends Service with Logging {
def getShuffleList: String
+ def getApplicationList: String
+
def listTopDiskUseApps: String
def getMasterGroupInfo: String = throw new UnsupportedOperationException()
@@ -61,8 +63,6 @@ abstract class HttpService extends Service with Logging {
def getHostnameList: String = throw new UnsupportedOperationException()
- def getApplicationList: String = throw new UnsupportedOperationException()
-
def exclude(addWorkers: String, removeWorkers: String): String =
throw new UnsupportedOperationException()
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
index 544505cd6..1e4a48a63 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
@@ -76,6 +76,19 @@ case object Shuffles extends HttpEndpoint {
service.getShuffleList
}
+case object Applications extends HttpEndpoint {
+ override def path: String = "/applications"
+
+ override def description(service: String): String =
+ if (service == Service.MASTER)
+ "List all running application's ids of the cluster."
+ else
+ "List all running application's ids of the worker. It only return
application ids running in that worker."
+
+ override def handle(service: HttpService, parameters: Map[String, String]):
String =
+ service.getApplicationList
+}
+
case object ListTopDiskUsedApps extends HttpEndpoint {
override def path: String = "/listTopDiskUsedApps"
@@ -158,16 +171,6 @@ case object Hostnames extends HttpEndpoint {
service.getHostnameList
}
-case object Applications extends HttpEndpoint {
- override def path: String = "/applications"
-
- override def description(service: String): String =
- "List all running application's ids of the cluster."
-
- override def handle(service: HttpService, parameters: Map[String, String]):
String =
- service.getApplicationList
-}
-
case object Exclude extends HttpEndpoint {
override def path: String = "/exclude"
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index c208c0f06..534607d85 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -25,14 +25,13 @@ import org.apache.celeborn.server.common.{HttpService,
Service}
object HttpUtils {
private val baseEndpoints: List[HttpEndpoint] =
- List(Conf, WorkerInfo, ThreadDump, Shuffles, ListTopDiskUsedApps, Help)
+ List(Conf, WorkerInfo, ThreadDump, Shuffles, Applications,
ListTopDiskUsedApps, Help)
private val masterEndpoints: List[HttpEndpoint] = List(
MasterGroupInfo,
LostWorkers,
ExcludedWorkers,
ShutdownWorkers,
Hostnames,
- Applications,
Exclude) ++ baseEndpoints
private val workerEndpoints: List[HttpEndpoint] =
List(
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
index 8b877a8c7..9f025ae9a 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -77,6 +77,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
|""".stripMargin)
assert(HttpUtils.help(Service.WORKER) ==
s"""Available API providers include:
+ |/applications List all running application's ids of the
worker. It only return application ids running in that worker.
|/conf List the conf setting of the worker.
|/exit Trigger this worker to exit. Legal types
are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
|/help List the available API providers of the
worker.
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9186dec60..fd14a0460 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -285,6 +285,9 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
+ workerSource.addGauge(WorkerSource.RUNNING_APPLICATION_COUNT) { () =>
+ workerInfo.getApplicationIdSet.size
+ }
workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
memoryManager.getSortMemoryCounter.get()
}
@@ -606,6 +609,15 @@ private[celeborn] class Worker(
sb.toString()
}
+ override def getApplicationList: String = {
+ val sb = new StringBuilder
+ sb.append("================= LifecycleManager Application List
======================\n")
+ workerInfo.getApplicationIdSet.asScala.foreach { appId =>
+ sb.append(s"$appId\n")
+ }
+ sb.toString()
+ }
+
override def getShuffleList: String = {
val sb = new StringBuilder
sb.append("======================= Shuffle Key List
============================\n")
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 6b6d77122..b363f3865 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -103,6 +103,8 @@ object WorkerSource {
val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
+ val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"
+
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"