Repository: spark
Updated Branches:
refs/heads/master 112bd9bfc -> ea542d29b
[SPARK-19824][CORE] Update JsonProtocol to keep consistent with the UI
## What changes were proposed in this pull request?
Fix any inconsistent part in JsonProtocol with the UI.
This PR also contains the modifications in #17181
## How was this patch tested?
Updated JsonProtocolSuite.
Before this change, localhost:8080/json shows:
```
{
"url" : "spark://xingbos-MBP.local:7077",
"workers" : [ {
"id" : "worker-20170615172946-192.168.0.101-49450",
"host" : "192.168.0.101",
"port" : 49450,
"webuiaddress" : "http://192.168.0.101:8081",
"cores" : 8,
"coresused" : 8,
"coresfree" : 0,
"memory" : 15360,
"memoryused" : 1024,
"memoryfree" : 14336,
"state" : "ALIVE",
"lastheartbeat" : 1497519481722
}, {
"id" : "worker-20170615172948-192.168.0.101-49452",
"host" : "192.168.0.101",
"port" : 49452,
"webuiaddress" : "http://192.168.0.101:8082",
"cores" : 8,
"coresused" : 8,
"coresfree" : 0,
"memory" : 15360,
"memoryused" : 1024,
"memoryfree" : 14336,
"state" : "ALIVE",
"lastheartbeat" : 1497519484160
}, {
"id" : "worker-20170615172951-192.168.0.101-49469",
"host" : "192.168.0.101",
"port" : 49469,
"webuiaddress" : "http://192.168.0.101:8083",
"cores" : 8,
"coresused" : 8,
"coresfree" : 0,
"memory" : 15360,
"memoryused" : 1024,
"memoryfree" : 14336,
"state" : "ALIVE",
"lastheartbeat" : 1497519486905
} ],
"cores" : 24,
"coresused" : 24,
"memory" : 46080,
"memoryused" : 3072,
"activeapps" : [ {
"starttime" : 1497519426990,
"id" : "app-20170615173706-0001",
"name" : "Spark shell",
"user" : "xingbojiang",
"memoryperslave" : 1024,
"submitdate" : "Thu Jun 15 17:37:06 CST 2017",
"state" : "RUNNING",
"duration" : 65362
} ],
"completedapps" : [ {
"starttime" : 1497519250893,
"id" : "app-20170615173410-0000",
"name" : "Spark shell",
"user" : "xingbojiang",
"memoryperslave" : 1024,
"submitdate" : "Thu Jun 15 17:34:10 CST 2017",
"state" : "FINISHED",
"duration" : 116895
} ],
"activedrivers" : [ ],
"status" : "ALIVE"
}
```
After the change:
```
{
"url" : "spark://xingbos-MBP.local:7077",
"workers" : [ {
"id" : "worker-20170615175032-192.168.0.101-49951",
"host" : "192.168.0.101",
"port" : 49951,
"webuiaddress" : "http://192.168.0.101:8081",
"cores" : 8,
"coresused" : 8,
"coresfree" : 0,
"memory" : 15360,
"memoryused" : 1024,
"memoryfree" : 14336,
"state" : "ALIVE",
"lastheartbeat" : 1497520292900
}, {
"id" : "worker-20170615175034-192.168.0.101-49953",
"host" : "192.168.0.101",
"port" : 49953,
"webuiaddress" : "http://192.168.0.101:8082",
"cores" : 8,
"coresused" : 8,
"coresfree" : 0,
"memory" : 15360,
"memoryused" : 1024,
"memoryfree" : 14336,
"state" : "ALIVE",
"lastheartbeat" : 1497520280301
}, {
"id" : "worker-20170615175037-192.168.0.101-49955",
"host" : "192.168.0.101",
"port" : 49955,
"webuiaddress" : "http://192.168.0.101:8083",
"cores" : 8,
"coresused" : 8,
"coresfree" : 0,
"memory" : 15360,
"memoryused" : 1024,
"memoryfree" : 14336,
"state" : "ALIVE",
"lastheartbeat" : 1497520282884
} ],
"aliveworkers" : 3,
"cores" : 24,
"coresused" : 24,
"memory" : 46080,
"memoryused" : 3072,
"activeapps" : [ {
"id" : "app-20170615175122-0001",
"starttime" : 1497520282115,
"name" : "Spark shell",
"cores" : 24,
"user" : "xingbojiang",
"memoryperslave" : 1024,
"submitdate" : "Thu Jun 15 17:51:22 CST 2017",
"state" : "RUNNING",
"duration" : 10805
} ],
"completedapps" : [ {
"id" : "app-20170615175058-0000",
"starttime" : 1497520258766,
"name" : "Spark shell",
"cores" : 24,
"user" : "xingbojiang",
"memoryperslave" : 1024,
"submitdate" : "Thu Jun 15 17:50:58 CST 2017",
"state" : "FINISHED",
"duration" : 9876
} ],
"activedrivers" : [ ],
"completeddrivers" : [ ],
"status" : "ALIVE"
}
```
Author: Xingbo Jiang <[email protected]>
Closes #18303 from jiangxb1987/json-protocol.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea542d29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea542d29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea542d29
Branch: refs/heads/master
Commit: ea542d29b2ae99cfff47fed40b7a9ab77d41b391
Parents: 112bd9b
Author: Xingbo Jiang <[email protected]>
Authored: Sun Jun 18 22:05:06 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Sun Jun 18 22:05:06 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/deploy/JsonProtocol.scala | 158 ++++++++++++++++---
.../apache/spark/deploy/DeployTestUtils.scala | 4 +-
.../apache/spark/deploy/JsonProtocolSuite.scala | 15 +-
3 files changed, 149 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ea542d29/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 220b20b..7212696 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -21,30 +21,65 @@ import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse,
WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
+import org.apache.spark.deploy.master._
+import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
private[deploy] object JsonProtocol {
- def writeWorkerInfo(obj: WorkerInfo): JObject = {
- ("id" -> obj.id) ~
- ("host" -> obj.host) ~
- ("port" -> obj.port) ~
- ("webuiaddress" -> obj.webUiAddress) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("coresfree" -> obj.coresFree) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed) ~
- ("memoryfree" -> obj.memoryFree) ~
- ("state" -> obj.state.toString) ~
- ("lastheartbeat" -> obj.lastHeartbeat)
- }
+ /**
+ * Export the [[WorkerInfo]] to a Json object. A [[WorkerInfo]] consists of
the information of a
+ * worker.
+ *
+ * @return a Json object containing the following fields:
+ * `id` a string identifier of the worker
+ * `host` the host that the worker is running on
+ * `port` the port that the worker is bound to
+ * `webuiaddress` the address used in web UI
+ * `cores` total cores of the worker
+ * `coresused` allocated cores of the worker
+ * `coresfree` free cores of the worker
+ * `memory` total memory of the worker
+ * `memoryused` allocated memory of the worker
+ * `memoryfree` free memory of the worker
+ * `state` state of the worker, see [[WorkerState]]
+ * `lastheartbeat` time in milliseconds that the latest heart beat
message from the
+ * worker is received
+ */
+ def writeWorkerInfo(obj: WorkerInfo): JObject = {
+ ("id" -> obj.id) ~
+ ("host" -> obj.host) ~
+ ("port" -> obj.port) ~
+ ("webuiaddress" -> obj.webUiAddress) ~
+ ("cores" -> obj.cores) ~
+ ("coresused" -> obj.coresUsed) ~
+ ("coresfree" -> obj.coresFree) ~
+ ("memory" -> obj.memory) ~
+ ("memoryused" -> obj.memoryUsed) ~
+ ("memoryfree" -> obj.memoryFree) ~
+ ("state" -> obj.state.toString) ~
+ ("lastheartbeat" -> obj.lastHeartbeat)
+ }
+ /**
+ * Export the [[ApplicationInfo]] to a Json objec. An [[ApplicationInfo]]
consists of the
+ * information of an application.
+ *
+ * @return a Json object containing the following fields:
+ * `id` a string identifier of the application
+ * `starttime` time in milliseconds that the application starts
+ * `name` the description of the application
+ * `cores` total cores granted to the application
+ * `user` name of the user who submitted the application
+ * `memoryperslave` minimal memory in MB required to each executor
+ * `submitdate` time in Date that the application is submitted
+ * `state` state of the application, see [[ApplicationState]]
+ * `duration` time in milliseconds that the application has been
running
+ */
def writeApplicationInfo(obj: ApplicationInfo): JObject = {
- ("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
+ ("starttime" -> obj.startTime) ~
("name" -> obj.desc.name) ~
- ("cores" -> obj.desc.maxCores) ~
+ ("cores" -> obj.coresGranted) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("submitdate" -> obj.submitDate.toString) ~
@@ -52,14 +87,36 @@ private[deploy] object JsonProtocol {
("duration" -> obj.duration)
}
+ /**
+ * Export the [[ApplicationDescription]] to a Json object. An
[[ApplicationDescription]] consists
+ * of the description of an application.
+ *
+ * @return a Json object containing the following fields:
+ * `name` the description of the application
+ * `cores` max cores that can be allocated to the application, 0
means unlimited
+ * `memoryperslave` minimal memory in MB required to each executor
+ * `user` name of the user who submitted the application
+ * `command` the command string used to submit the application
+ */
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
- ("cores" -> obj.maxCores) ~
+ ("cores" -> obj.maxCores.getOrElse(0)) ~
("memoryperslave" -> obj.memoryPerExecutorMB) ~
("user" -> obj.user) ~
("command" -> obj.command.toString)
}
+ /**
+ * Export the [[ExecutorRunner]] to a Json object. An [[ExecutorRunner]]
consists of the
+ * information of an executor.
+ *
+ * @return a Json object containing the following fields:
+ * `id` an integer identifier of the executor
+ * `memory` memory in MB allocated to the executor
+ * `appid` a string identifier of the application that the executor
is working on
+ * `appdesc` a Json object of the [[ApplicationDescription]] of the
application that the
+ * executor is working on
+ */
def writeExecutorRunner(obj: ExecutorRunner): JObject = {
("id" -> obj.execId) ~
("memory" -> obj.memory) ~
@@ -67,18 +124,59 @@ private[deploy] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
+ /**
+ * Export the [[DriverInfo]] to a Json object. A [[DriverInfo]] consists of
the information of a
+ * driver.
+ *
+ * @return a Json object containing the following fields:
+ * `id` a string identifier of the driver
+ * `starttime` time in milliseconds that the driver starts
+ * `state` state of the driver, see [[DriverState]]
+ * `cores` cores allocated to the driver
+ * `memory` memory in MB allocated to the driver
+ * `submitdate` time in Date that the driver is created
+ * `worker` identifier of the worker that the driver is running on
+ * `mainclass` main class of the command string that started the
driver
+ */
def writeDriverInfo(obj: DriverInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
- ("memory" -> obj.desc.mem)
+ ("memory" -> obj.desc.mem) ~
+ ("submitdate" -> obj.submitDate.toString) ~
+ ("worker" -> obj.worker.map(_.id).getOrElse("None")) ~
+ ("mainclass" -> obj.desc.command.arguments(2))
}
+ /**
+ * Export the [[MasterStateResponse]] to a Json object. A
[[MasterStateResponse]] consists the
+ * information of a master node.
+ *
+ * @return a Json object containing the following fields:
+ * `url` the url of the master node
+ * `workers` a list of Json objects of [[WorkerInfo]] of the workers
allocated to the
+ * master
+ * `aliveworkers` size of alive workers allocated to the master
+ * `cores` total cores available of the master
+ * `coresused` cores used by the master
+ * `memory` total memory available of the master
+ * `memoryused` memory used by the master
+ * `activeapps` a list of Json objects of [[ApplicationInfo]] of the
active applications
+ * running on the master
+ * `completedapps` a list of Json objects of [[ApplicationInfo]] of
the applications
+ * completed in the master
+ * `activedrivers` a list of Json objects of [[DriverInfo]] of the
active drivers of the
+ * master
+ * `completeddrivers` a list of Json objects of [[DriverInfo]] of
the completed drivers
+ * of the master
+ * `status` status of the master, see [[MasterState]]
+ */
def writeMasterState(obj: MasterStateResponse): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
+ ("aliveworkers" -> aliveWorkers.length) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
@@ -86,9 +184,27 @@ private[deploy] object JsonProtocol {
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
+ ("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
}
+ /**
+ * Export the [[WorkerStateResponse]] to a Json object. A
[[WorkerStateResponse]] consists the
+ * information of a worker node.
+ *
+ * @return a Json object containing the following fields:
+ * `id` a string identifier of the worker node
+ * `masterurl` url of the master node of the worker
+ * `masterwebuiurl` the address used in web UI of the master node of
the worker
+ * `cores` total cores of the worker
+ * `coreused` used cores of the worker
+ * `memory` total memory of the worker
+ * `memoryused` used memory of the worker
+ * `executors` a list of Json objects of [[ExecutorRunner]] of the
executors running on
+ * the worker
+ * `finishedexecutors` a list of Json objects of [[ExecutorRunner]]
of the finished
+ * executors of the worker
+ */
def writeWorkerState(obj: WorkerStateResponse): JObject = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
@@ -97,7 +213,7 @@ private[deploy] object JsonProtocol {
("coresused" -> obj.coresUsed) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
- ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
- ("finishedexecutors" ->
obj.finishedExecutors.toList.map(writeExecutorRunner))
+ ("executors" -> obj.executors.map(writeExecutorRunner)) ~
+ ("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ea542d29/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 9c13c15..55a541d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -39,7 +39,7 @@ private[deploy] object DeployTestUtils {
}
def createDriverCommand(): Command = new Command(
- "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
+ "org.apache.spark.FakeClass", Seq("WORKER_URL", "USER_JAR", "mainClass"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"),
Seq("-Dfoo")
)
@@ -47,7 +47,7 @@ private[deploy] object DeployTestUtils {
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false,
createDriverCommand())
def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
- createDriverDesc(), new Date())
+ createDriverDesc(), JsonConstants.submitDate)
def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null,
"http://publicAddress:80")
http://git-wip-us.apache.org/repos/asf/spark/blob/ea542d29/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 7093dad..1903130 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -104,8 +104,8 @@ object JsonConstants {
val submitDate = new Date(123456789)
val appInfoJsonStr =
"""
- |{"starttime":3,"id":"id","name":"name",
- |"cores":4,"user":"%s",
+ |{"id":"id","starttime":3,"name":"name",
+ |"cores":0,"user":"%s",
|"memoryperslave":1234,"submitdate":"%s",
|"state":"WAITING","duration":%d}
""".format(System.getProperty("user.name", "<unknown>"),
@@ -134,19 +134,24 @@ object JsonConstants {
val driverInfoJsonStr =
"""
-
|{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100}
- """.stripMargin
+ |{"id":"driver-3","starttime":"3",
+ |"state":"SUBMITTED","cores":3,"memory":100,
+ |"submitdate":"%s","worker":"None",
+ |"mainclass":"mainClass"}
+ """.format(submitDate.toString).stripMargin
val masterStateJsonStr =
"""
|{"url":"spark://host:8080",
|"workers":[%s,%s],
+ |"aliveworkers":2,
|"cores":8,"coresused":0,"memory":2468,"memoryused":0,
|"activeapps":[%s],"completedapps":[],
|"activedrivers":[%s],
+ |"completeddrivers":[%s],
|"status":"ALIVE"}
""".format(workerInfoJsonStr, workerInfoJsonStr,
- appInfoJsonStr, driverInfoJsonStr).stripMargin
+ appInfoJsonStr, driverInfoJsonStr, driverInfoJsonStr).stripMargin
val workerStateJsonStr =
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]