This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e7e39a51b [CELEBORN-1189] Introduce RunningApplicationCount metric and 
/applications API to record running applications of worker
e7e39a51b is described below

commit e7e39a51be735a76afdccb71c1cffbaa5658423d
Author: SteNicholas <[email protected]>
AuthorDate: Wed Dec 27 09:51:16 2023 +0800

    [CELEBORN-1189] Introduce RunningApplicationCount metric and /applications 
API to record running applications of worker
    
    ### What changes were proposed in this pull request?
    
    Introduce `RunningApplicationCount` metric and `/applications` API to 
record running applications for Celeborn worker.
    
    ### Why are the changes needed?
    
    `RunningApplicationCount` metrics only monitors the count of running 
applications in the cluster for master. Meanwhile, `/listTopDiskUsedApps` API 
lists the top disk usage application ids for master and worker. Therefore 
`RunningApplicationCount` metric and `/applications` API could be introduced to 
record running applications of worker.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
    
    Closes #2172 from SteNicholas/CELEBORN-1189.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 METRICS.md                                         |  2 +-
 README.md                                          |  2 +-
 .../apache/celeborn/common/meta/DeviceInfo.scala   | 41 +++++++++++++++-------
 .../apache/celeborn/common/meta/WorkerInfo.scala   |  8 +++++
 .../celeborn/common/meta/WorkerInfoSuite.scala     |  8 ++---
 docs/deploy.md                                     |  2 +-
 docs/monitoring.md                                 |  2 ++
 .../celeborn/server/common/HttpService.scala       |  4 +--
 .../celeborn/server/common/http/HttpEndpoint.scala | 23 ++++++------
 .../celeborn/server/common/http/HttpUtils.scala    |  3 +-
 .../server/common/http/HttpUtilsSuite.scala        |  1 +
 .../celeborn/service/deploy/worker/Worker.scala    | 12 +++++++
 .../service/deploy/worker/WorkerSource.scala       |  2 ++
 13 files changed, 77 insertions(+), 33 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index ec0ffad10..986042342 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -78,10 +78,10 @@ Here is an example of Grafana dashboard importing.
 
|:--------------------------------------:|:-----------------:|:---------------------------------------------------------------------------------------------------------------:|
 |              WorkerCount               |      master       |                 
                         The count of active workers.                           
                |
 |          ExcludedWorkerCount           |      master       |                 
                    The count of workers in excluded list.                      
                |
-|        RunningApplicationCount         |      master       |                 
               The count of running applications in the cluster.                
                |
 |             OfferSlotsTime             |      master       |                 
                           The time of offer slots.                             
                |
 |             PartitionSize              |      master       |          The 
estimated partition size of last 20 flush window whose length is 15 seconds by 
defaults.           |
 |         RegisteredShuffleCount         | master and worker |                 
                 The value means count of registered shuffle.                   
                |
+|        RunningApplicationCount         | master and worker |                 
                The value means count of running applications.                  
                |
 |           ActiveShuffleSize            | master and worker |   The value 
means the active shuffle size for workers or a worker including master replica 
and slave replica.   |
 |         ActiveShuffleFileCount         | master and worker |   The value 
means the active shuffle size for workers or a worker including master replica 
and slave replica.   |
 |             diskFileCount              | master and worker |                 
               The count of disk files consumption by each user.                
                |
diff --git a/README.md b/README.md
index d424648ca..160180b7f 100644
--- a/README.md
+++ b/README.md
@@ -231,7 +231,7 @@ FetchPort: 37569
 ReplicatePort: 37093
 SlotsUsed: 0()
 LastHeartbeat: 0
-Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 448284381184, 
avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , 
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0 shuffleAllocations: 
Map(), mountPoint: /mnt/disk3, usableSpace: 450755608576, avgFlushTime: 0, 
activeSlots: 0) status: HEALTHY dirs , /mnt/disk2=DiskInfo(maxSlots: 6713, 
committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/d [...]
+Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 
448284381184, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , 
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0, running applications 
0, shuffleAllocations: Map(), mountPoint: /mnt/disk3, usableSpace: 
450755608576, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , 
/mnt/disk2=DiskInfo(maxSlots: 6713, committed shuffl [...]
 WorkerRef: null
 ```
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index 990a2b127..e3e547407 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.meta
 
 import java.io.File
 import java.util
+import java.util.function.BiFunction
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
@@ -84,6 +85,7 @@ class DiskInfo(
   var storageType: StorageInfo.Type = StorageInfo.Type.SSD
   var maxSlots: Long = 0
   lazy val shuffleAllocations = new util.HashMap[String, Integer]()
+  lazy val applicationAllocations = new util.HashMap[String, Integer]()
 
   def setStorageType(storageType: StorageInfo.Type) = {
     this.storageType = storageType
@@ -121,28 +123,38 @@ class DiskInfo(
   }
 
   def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
-    val allocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
-    shuffleAllocations.put(shuffleKey, allocated + slots)
+    val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+    val shuffleAllocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
+    val applicationAllocated = 
applicationAllocations.getOrDefault(applicationId, 0)
+    shuffleAllocations.put(shuffleKey, shuffleAllocated + slots)
+    applicationAllocations.put(applicationId, applicationAllocated + slots)
     activeSlots = activeSlots + slots
   }
 
   def releaseSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
-    val allocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
-    if (allocated < slots) {
-      logError(s"allocated $allocated is less than to release $slots !")
-    }
-    val delta = Math.min(allocated, slots)
-    activeSlots = activeSlots - delta
-    if (allocated > slots) {
-      shuffleAllocations.put(shuffleKey, allocated - slots)
+    val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+    val shuffleAllocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
+    val applicationAllocated = 
applicationAllocations.getOrDefault(applicationId, 0)
+    if (shuffleAllocated < slots) {
+      logError(s"allocated $shuffleAllocated is less than to release $slots !")
     } else {
-      shuffleAllocations.put(shuffleKey, 0)
+      shuffleAllocations.put(shuffleKey, shuffleAllocated - slots)
+      applicationAllocations.put(applicationId, applicationAllocated - slots)
     }
+    activeSlots = activeSlots - Math.min(shuffleAllocated, slots)
   }
 
   def releaseSlots(shuffleKey: String): Unit = this.synchronized {
     val allocated = shuffleAllocations.remove(shuffleKey)
     if (allocated != null) {
+      val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+      var applicationAllocated = 
applicationAllocations.getOrDefault(applicationId, 0)
+      applicationAllocated = applicationAllocated - allocated
+      if (applicationAllocated <= 0) {
+        applicationAllocations.remove(applicationId)
+      } else {
+        applicationAllocations.put(applicationId, applicationAllocated)
+      }
       activeSlots = activeSlots - allocated
     }
   }
@@ -151,10 +163,15 @@ class DiskInfo(
     new util.HashSet(shuffleAllocations.keySet())
   }
 
+  def getApplicationIdSet(): util.HashSet[String] = this.synchronized {
+    new util.HashSet(applicationAllocations.keySet())
+  }
+
   override def toString: String = this.synchronized {
     val (emptyShuffles, nonEmptyShuffles) = 
shuffleAllocations.asScala.partition(_._2 == 0)
     s"DiskInfo(maxSlots: $maxSlots," +
-      s" committed shuffles ${emptyShuffles.size}" +
+      s" committed shuffles ${emptyShuffles.size}," +
+      s" running applications ${applicationAllocations.size}," +
       s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
       s" mountPoint: $mountPoint," +
       s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 05ea35d63..20561eebf 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -103,6 +103,14 @@ class WorkerInfo(
     shuffleKeySet
   }
 
+  def getApplicationIdSet: util.HashSet[String] = this.synchronized {
+    val applicationIdSet = new util.HashSet[String]()
+    diskInfos.values().asScala.foreach { diskInfo =>
+      applicationIdSet.addAll(diskInfo.getApplicationIdSet())
+    }
+    applicationIdSet
+  }
+
   def hasSameInfoWith(other: WorkerInfo): Boolean = {
     rpcPort == other.rpcPort &&
     pushPort == other.pushPort &&
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index ebd83682e..a435f35e3 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -73,7 +73,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
       new WorkerInfo("localhost", 10000, 10001, 10002, 10003, disks, 
userResourceConsumption)
 
     val allocatedSlots = new AtomicInteger(0)
-    val shuffleKey = "appId-shuffleId"
+    val shuffleKey = "appId-1"
     val es = ThreadUtils.newDaemonFixedThreadPool(8, "workerInfo-unit-test")
 
     val futures = new ArrayBuffer[Future[_]]()
@@ -284,9 +284,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
            |SlotsUsed: 60
            |LastHeartbeat: 0
            |Disks: $placeholder
-           |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB, 
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) 
status: HEALTHY dirs $placeholder
-           |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB, 
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) 
status: HEALTHY dirs $placeholder
-           |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB, 
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) 
status: HEALTHY dirs $placeholder
+           |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 
2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, 
storageType: SSD) status: HEALTHY dirs $placeholder
+           |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 
2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, 
storageType: SSD) status: HEALTHY dirs $placeholder
+           |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 
2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, 
storageType: SSD) status: HEALTHY dirs $placeholder
            |UserResourceConsumption: $placeholder
            |  UserIdentifier: `tenant1`.`name1`, ResourceConsumption: 
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, 
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
            |WorkerRef: null
diff --git a/docs/deploy.md b/docs/deploy.md
index c289b57bb..6b3ffc09c 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -151,7 +151,7 @@ FetchPort: 37569
 ReplicatePort: 37093
 SlotsUsed: 0()
 LastHeartbeat: 0
-Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0 
shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 448284381184, 
avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , 
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0 shuffleAllocations: 
Map(), mountPoint: /mnt/disk3, usableSpace: 450755608576, avgFlushTime: 0, 
activeSlots: 0) status: HEALTHY dirs , /mnt/disk2=DiskInfo(maxSlots: 6713, 
committed shuffles 0 shuffleAllocations: Map(), mountPoint: /mnt/d [...]
+Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: /mnt/disk1, usableSpace: 
448284381184, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , 
/mnt/disk3=DiskInfo(maxSlots: 6716, committed shuffles 0, running applications 
0, shuffleAllocations: Map(), mountPoint: /mnt/disk3, usableSpace: 
450755608576, avgFlushTime: 0, activeSlots: 0) status: HEALTHY dirs , 
/mnt/disk2=DiskInfo(maxSlots: 6713, committed shuffl [...]
 WorkerRef: null
 ```
 
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 64592932f..819658007 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -163,6 +163,7 @@ These metrics are exposed by Celeborn worker.
     - TakeBufferTime
         - The time for a worker to take out a buffer from a disk flusher.
     - RegisteredShuffleCount
+    - RunningApplicationCount
     - SlotsAllocated
     - NettyMemory
         - The total amount of off-heap memory used by celeborn worker.
@@ -330,6 +331,7 @@ API path listed as below:
 | /conf                      | List the conf setting of the worker.            
                                                                                
    |
 | /workerInfo                | List the worker information of the worker.      
                                                                                
    |
 | /threadDump                | List the current thread dump of the worker.     
                                                                                
    |
+| /applications              | List all running application's ids of the 
worker. It only return application ids running in that worker.                  
          |
 | /shuffles                  | List all the running shuffle keys of the 
worker. It only return keys of shuffles running in that worker.                 
           |
 | /listTopDiskUsedApps       | List the top disk usage application ids. It 
only return application ids running in that worker.                             
        |
 | /listPartitionLocationInfo | List all the living PartitionLocation 
information in that worker.                                                     
              |
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 4918bcc23..b7bdc7d87 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -49,6 +49,8 @@ abstract class HttpService extends Service with Logging {
 
   def getShuffleList: String
 
+  def getApplicationList: String
+
   def listTopDiskUseApps: String
 
   def getMasterGroupInfo: String = throw new UnsupportedOperationException()
@@ -61,8 +63,6 @@ abstract class HttpService extends Service with Logging {
 
   def getHostnameList: String = throw new UnsupportedOperationException()
 
-  def getApplicationList: String = throw new UnsupportedOperationException()
-
   def exclude(addWorkers: String, removeWorkers: String): String =
     throw new UnsupportedOperationException()
 
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
index 544505cd6..1e4a48a63 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
@@ -76,6 +76,19 @@ case object Shuffles extends HttpEndpoint {
     service.getShuffleList
 }
 
+case object Applications extends HttpEndpoint {
+  override def path: String = "/applications"
+
+  override def description(service: String): String =
+    if (service == Service.MASTER)
+      "List all running application's ids of the cluster."
+    else
+      "List all running application's ids of the worker. It only return 
application ids running in that worker."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
+    service.getApplicationList
+}
+
 case object ListTopDiskUsedApps extends HttpEndpoint {
   override def path: String = "/listTopDiskUsedApps"
 
@@ -158,16 +171,6 @@ case object Hostnames extends HttpEndpoint {
     service.getHostnameList
 }
 
-case object Applications extends HttpEndpoint {
-  override def path: String = "/applications"
-
-  override def description(service: String): String =
-    "List all running application's ids of the cluster."
-
-  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
-    service.getApplicationList
-}
-
 case object Exclude extends HttpEndpoint {
   override def path: String = "/exclude"
 
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index c208c0f06..534607d85 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -25,14 +25,13 @@ import org.apache.celeborn.server.common.{HttpService, 
Service}
 object HttpUtils {
 
   private val baseEndpoints: List[HttpEndpoint] =
-    List(Conf, WorkerInfo, ThreadDump, Shuffles, ListTopDiskUsedApps, Help)
+    List(Conf, WorkerInfo, ThreadDump, Shuffles, Applications, 
ListTopDiskUsedApps, Help)
   private val masterEndpoints: List[HttpEndpoint] = List(
     MasterGroupInfo,
     LostWorkers,
     ExcludedWorkers,
     ShutdownWorkers,
     Hostnames,
-    Applications,
     Exclude) ++ baseEndpoints
   private val workerEndpoints: List[HttpEndpoint] =
     List(
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
index 8b877a8c7..9f025ae9a 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -77,6 +77,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
          |""".stripMargin)
     assert(HttpUtils.help(Service.WORKER) ==
       s"""Available API providers include:
+         |/applications              List all running application's ids of the 
worker. It only return application ids running in that worker.
          |/conf                      List the conf setting of the worker.
          |/exit                      Trigger this worker to exit. Legal types 
are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
          |/help                      List the available API providers of the 
worker.
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9186dec60..fd14a0460 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -285,6 +285,9 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
     workerInfo.getShuffleKeySet.size
   }
+  workerSource.addGauge(WorkerSource.RUNNING_APPLICATION_COUNT) { () =>
+    workerInfo.getApplicationIdSet.size
+  }
   workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
     memoryManager.getSortMemoryCounter.get()
   }
@@ -606,6 +609,15 @@ private[celeborn] class Worker(
     sb.toString()
   }
 
+  override def getApplicationList: String = {
+    val sb = new StringBuilder
+    sb.append("================= LifecycleManager Application List 
======================\n")
+    workerInfo.getApplicationIdSet.asScala.foreach { appId =>
+      sb.append(s"$appId\n")
+    }
+    sb.toString()
+  }
+
   override def getShuffleList: String = {
     val sb = new StringBuilder
     sb.append("======================= Shuffle Key List 
============================\n")
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 6b6d77122..b363f3865 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -103,6 +103,8 @@ object WorkerSource {
 
   val REGISTERED_SHUFFLE_COUNT = "RegisteredShuffleCount"
 
+  val RUNNING_APPLICATION_COUNT = "RunningApplicationCount"
+
   // slots
   val SLOTS_ALLOCATED = "SlotsAllocated"
 

Reply via email to