This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f75c7a7b5240 [SPARK-46883][CORE] Support `/json/clusterutilization` API
f75c7a7b5240 is described below
commit f75c7a7b52402e4c8faa39b2f88623e9f0bca916
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Jan 27 09:21:17 2024 -0800
[SPARK-46883][CORE] Support `/json/clusterutilization` API
### What changes were proposed in this pull request?
This PR aims to support new `/json/clusterutilization` API in `Master` JSON
endpoint
### Why are the changes needed?
The user can get CPU/Memory/Waiting apps in a single API call.
```
# Start Spark Cluster and Spark Shell
$ sbin/start-master.sh
$ sbin/start-worker.sh spark://$(hostname):7077;
$ bin/spark-shell --master spark://$(hostname):7077
# Check `Cluster Utilization API`
$ curl http://localhost:8080/json/clusterutilization
{
"waitingDrivers" : 0,
"cores" : 10,
"coresused" : 10,
"coresutilization" : 100,
"memory" : 31744,
"memoryused" : 1024,
"memoryutilization" : 3
}
```
### Does this PR introduce _any_ user-facing change?
No. This is a newly added API.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44908 from dongjoon-hyun/SPARK-46883.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/JsonProtocol.scala | 18 ++++++++++++++++++
.../apache/spark/deploy/master/ui/MasterPage.scala | 2 ++
.../org/apache/spark/deploy/JsonProtocolSuite.scala | 21 +++++++++++++++++++++
3 files changed, 41 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 8c356081b277..9c73e84f4166 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -299,4 +299,22 @@ private[deploy] object JsonProtocol {
("executors" -> obj.executors.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner))
}
+
+ /**
+ * Export the cluster utilization based on the [[MasterStateResponse]] to a
Json object.
+ */
+ def writeClusterUtilization(obj: MasterStateResponse): JObject = {
+ val aliveWorkers = obj.workers.filter(_.isAlive())
+ val cores = aliveWorkers.map(_.cores).sum
+ val coresUsed = aliveWorkers.map(_.coresUsed).sum
+ val memory = aliveWorkers.map(_.memory).sum
+ val memoryUsed = aliveWorkers.map(_.memoryUsed).sum
+ ("waitingDrivers" -> obj.activeDrivers.count(_.state ==
DriverState.SUBMITTED)) ~
+ ("cores" -> cores) ~
+ ("coresused" -> coresUsed) ~
+ ("coresutilization" -> 100 * coresUsed / cores) ~
+ ("memory" -> memory) ~
+ ("memoryused" -> memoryUsed) ~
+ ("memoryutilization" -> 100 * memoryUsed / memory)
+ }
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 36a79e060f01..cbeda23013ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -41,6 +41,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends
WebUIPage("") {
override def renderJson(request: HttpServletRequest): JValue = {
jsonFieldPattern.findFirstMatchIn(request.getRequestURI()) match {
case None => JsonProtocol.writeMasterState(getMasterState)
+ case Some(m) if m.group(1) == "clusterutilization" =>
+ JsonProtocol.writeClusterUtilization(getMasterState)
case Some(m) => JsonProtocol.writeMasterState(getMasterState,
Some(m.group(1)))
}
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 4a6ace6facde..6fca31234ee2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -105,6 +105,20 @@ class JsonProtocolSuite extends SparkFunSuite with
JsonTestUtils {
assertValidDataInJson(output,
JsonMethods.parse(JsonConstants.workerStateJsonStr))
}
+ test("SPARK-46883: writeClusterUtilization") {
+ val workers = Array(createWorkerInfo(), createWorkerInfo())
+ val activeApps = Array(createAppInfo())
+ val completedApps = Array.empty[ApplicationInfo]
+ val activeDrivers = Array(createDriverInfo())
+ val completedDrivers = Array(createDriverInfo())
+ val stateResponse = new MasterStateResponse(
+ "host", 8080, None, workers, activeApps, completedApps,
+ activeDrivers, completedDrivers, RecoveryState.ALIVE)
+ val output = JsonProtocol.writeClusterUtilization(stateResponse)
+ assertValidJson(output)
+ assertValidDataInJson(output,
JsonMethods.parse(JsonConstants.clusterUtilizationJsonStr))
+ }
+
def assertValidJson(json: JValue): Unit = {
try {
JsonMethods.parse(JsonMethods.compact(json))
@@ -206,4 +220,11 @@ object JsonConstants {
|"executors":[],
|"finishedexecutors":[%s,%s]}
""".format(executorRunnerJsonStr, executorRunnerJsonStr).stripMargin
+
+ val clusterUtilizationJsonStr =
+ """
+ |{"waitingDrivers":1,
+ |"cores":8,"coresused":0,"coresutilization":0,
+ |"memory":2468,"memoryused":0,"memoryutilization":0}
+ """.stripMargin
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]