This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new beb0238273b9 [SPARK-45819][CORE] Support `clear` in REST Submission API
beb0238273b9 is described below
commit beb0238273b937bb42e746f7b240dd63e48f0667
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Nov 7 10:11:19 2023 -0800
[SPARK-45819][CORE] Support `clear` in REST Submission API
### What changes were proposed in this pull request?
This PR aims to support `clear` in REST Submission API to clear `completed`
drivers and apps.
### Why are the changes needed?
This new feature is helpful for users to reset the completed drivers and
apps in Spark Master.
**"1 Completed"**
<img width="672" alt="Screenshot 2023-11-07 at 12 56 02 AM"
src="https://github.com/apache/spark/assets/9700541/1ba9e01c-3c66-4161-b7c6-b86b57837ae0">
**After invoking `clear` API, "0 Completed"**
```
$ curl -X POST http://max.local:6066/v1/submissions/clear
{
"action" : "ClearResponse",
"message" : "",
"serverSparkVersion" : "4.0.0-SNAPSHOT",
"success" : true
}
```
<img width="677" alt="Screenshot 2023-11-07 at 12 56 24 AM"
src="https://github.com/apache/spark/assets/9700541/a6ee816b-217e-4f93-bd55-a0c8a53c4729">
### Does this PR introduce _any_ user-facing change?
No, this is a new API.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43698 from dongjoon-hyun/SPARK-45819.
Lead-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/DeployMessage.scala | 2 ++
.../org/apache/spark/deploy/master/Master.scala | 8 +++++
.../spark/deploy/rest/RestSubmissionClient.scala | 35 ++++++++++++++++++++++
.../spark/deploy/rest/RestSubmissionServer.scala | 22 +++++++++++++-
.../spark/deploy/rest/StandaloneRestServer.scala | 19 ++++++++++++
.../deploy/rest/SubmitRestProtocolResponse.scala | 10 +++++++
.../deploy/rest/StandaloneRestSubmitSuite.scala | 33 ++++++++++++++++++++
7 files changed, 128 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 4ec0edd5909e..f49530461b4d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -238,6 +238,8 @@ private[deploy] object DeployMessages {
case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
workerId: Option[String], workerHostPort: Option[String], exception:
Option[Exception])
+ case object RequestClearCompletedDriversAndApps extends DeployMessage
+
// Internal message in AppClient
case object StopAppClient
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e63d72ebb40d..3ba50318610b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -460,6 +460,14 @@ private[deploy] class Master(
}
}
+ case RequestClearCompletedDriversAndApps =>
+ val numDrivers = completedDrivers.length
+ val numApps = completedApps.length
+ logInfo(s"Asked to clear $numDrivers completed drivers and $numApps
completed apps.")
+ completedDrivers.clear()
+ completedApps.clear()
+ context.reply(true)
+
case RequestDriverStatus(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 68f08dd951ef..3010efc936f9 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -135,6 +135,35 @@ private[spark] class RestSubmissionClient(master: String)
extends Logging {
response
}
+ /** Request that the server clears all submissions and applications. */
+ def clear(): SubmitRestProtocolResponse = {
+ logInfo(s"Submitting a request to clear $master.")
+ var handled: Boolean = false
+ var response: SubmitRestProtocolResponse = null
+ for (m <- masters if !handled) {
+ validateMaster(m)
+ val url = getClearUrl(m)
+ try {
+ response = post(url)
+ response match {
+ case k: ClearResponse =>
+ if (!Utils.responseFromBackup(k.message)) {
+ handleRestResponse(k)
+ handled = true
+ }
+ case unexpected =>
+ handleUnexpectedRestResponse(unexpected)
+ }
+ } catch {
+ case e: SubmitRestConnectionException =>
+ if (handleConnectionException(m)) {
+ throw new SubmitRestConnectionException("Unable to connect to
server", e)
+ }
+ }
+ }
+ response
+ }
+
/** Request the status of a submission from the server. */
def requestSubmissionStatus(
submissionId: String,
@@ -300,6 +329,12 @@ private[spark] class RestSubmissionClient(master: String)
extends Logging {
new URL(s"$baseUrl/kill/$submissionId")
}
+ /** Return the REST URL for clear all existing submissions and applications.
*/
+ private def getClearUrl(master: String): URL = {
+ val baseUrl = getBaseUrl(master)
+ new URL(s"$baseUrl/clear")
+ }
+
/** Return the REST URL for requesting the status of an existing submission.
*/
private def getStatusUrl(master: String, submissionId: String): URL = {
val baseUrl = getBaseUrl(master)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index 41845dc31a98..3323d0f529eb 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -55,6 +55,7 @@ private[spark] abstract class RestSubmissionServer(
protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val statusRequestServlet: StatusRequestServlet
+ protected val clearRequestServlet: ClearRequestServlet
private var _server: Option[Server] = None
@@ -64,6 +65,7 @@ private[spark] abstract class RestSubmissionServer(
s"$baseContext/create/*" -> submitRequestServlet,
s"$baseContext/kill/*" -> killRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
+ s"$baseContext/clear/*" -> clearRequestServlet,
"/*" -> new ErrorServlet // default handler
)
@@ -227,6 +229,24 @@ private[rest] abstract class KillRequestServlet extends
RestServlet {
protected def handleKill(submissionId: String): KillSubmissionResponse
}
+/**
+ * A servlet for handling clear requests passed to the
[[RestSubmissionServer]].
+ */
+private[rest] abstract class ClearRequestServlet extends RestServlet {
+
+ /**
+ * Clear the completed drivers and apps.
+ */
+ protected override def doPost(
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ val responseMessage = handleClear()
+ sendResponse(responseMessage, response)
+ }
+
+ protected def handleClear(): ClearResponse
+}
+
/**
* A servlet for handling status requests passed to the
[[RestSubmissionServer]].
*/
@@ -311,7 +331,7 @@ private class ErrorServlet extends RestServlet {
"Missing the /submissions prefix."
case `serverVersion` :: "submissions" :: tail =>
// http://host:port/correct-version/submissions/*
- "Missing an action: please specify one of /create, /kill, or
/status."
+ "Missing an action: please specify one of /create, /kill, /clear or
/status."
case unknownVersion :: tail =>
// http://host:port/unknown-version/*
versionMismatch = true
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index a298e4f6dbf0..8ed716428dc2 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -65,6 +65,8 @@ private[deploy] class StandaloneRestServer(
new StandaloneKillRequestServlet(masterEndpoint, masterConf)
protected override val statusRequestServlet =
new StandaloneStatusRequestServlet(masterEndpoint, masterConf)
+ protected override val clearRequestServlet =
+ new StandaloneClearRequestServlet(masterEndpoint, masterConf)
}
/**
@@ -107,6 +109,23 @@ private[rest] class
StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe
}
}
+/**
+ * A servlet for handling clear requests passed to the
[[StandaloneRestServer]].
+ */
+private[rest] class StandaloneClearRequestServlet(masterEndpoint:
RpcEndpointRef, conf: SparkConf)
+ extends ClearRequestServlet {
+
+ protected def handleClear(): ClearResponse = {
+ val response = masterEndpoint.askSync[Boolean](
+ DeployMessages.RequestClearCompletedDriversAndApps)
+ val c = new ClearResponse
+ c.serverSparkVersion = sparkVersion
+ c.message = ""
+ c.success = response
+ c
+ }
+}
+
/**
* A servlet for handling submit requests passed to the
[[StandaloneRestServer]].
*/
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
index 0e226ee294ca..21614c22285f 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -55,6 +55,16 @@ private[spark] class KillSubmissionResponse extends
SubmitRestProtocolResponse {
}
}
+/**
+ * A response to a clear request in the REST application submission protocol.
+ */
+private[spark] class ClearResponse extends SubmitRestProtocolResponse {
+ protected override def doValidate(): Unit = {
+ super.doValidate()
+ assertFieldIsSet(success, "success")
+ }
+}
+
/**
* A response to a status request in the REST application submission protocol.
*/
diff --git
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 3cd96670c8b5..d775aa6542dc 100644
---
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -227,6 +227,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
assert(statusResponse.submissionId === doesNotExist)
}
+ test("SPARK-45819: clear") {
+ val masterUrl = startDummyServer()
+ val response = new RestSubmissionClient(masterUrl).clear()
+ val clearResponse = getClearResponse(response)
+ assert(clearResponse.action === Utils.getFormattedClassName(clearResponse))
+ assert(clearResponse.serverSparkVersion === SPARK_VERSION)
+ assert(clearResponse.success)
+ }
+
/* ---------------------------------------- *
| Aberrant client / server behavior |
* ---------------------------------------- */
@@ -505,6 +514,15 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
}
}
+ /** Return the response as a clear response, or fail with error otherwise. */
+ private def getClearResponse(response: SubmitRestProtocolResponse):
ClearResponse = {
+ response match {
+ case k: ClearResponse => k
+ case e: ErrorResponse => fail(s"Server returned error: ${e.message}")
+ case r => fail(s"Expected clear response. Actual: ${r.toJson}")
+ }
+ }
+
/** Return the response as a status response, or fail with error otherwise.
*/
private def getStatusResponse(response: SubmitRestProtocolResponse):
SubmissionStatusResponse = {
response match {
@@ -574,6 +592,8 @@ private class DummyMaster(
context.reply(KillDriverResponse(self, driverId, success = true,
killMessage))
case RequestDriverStatus(driverId) =>
context.reply(DriverStatusResponse(found = true, Some(state), None,
None, exception))
+ case RequestClearCompletedDriversAndApps =>
+ context.reply(true)
}
}
@@ -617,6 +637,7 @@ private class SmarterMaster(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEn
* When handling a submit request, the server returns a malformed JSON.
* When handling a kill request, the server returns an invalid JSON.
* When handling a status request, the server throws an internal exception.
+ * When handling a clear request, the server throws an internal exception.
* The purpose of this class is to test that client handles these cases
gracefully.
*/
private class FaultyStandaloneRestServer(
@@ -630,6 +651,7 @@ private class FaultyStandaloneRestServer(
protected override val submitRequestServlet = new MalformedSubmitServlet
protected override val killRequestServlet = new InvalidKillServlet
protected override val statusRequestServlet = new ExplodingStatusServlet
+ protected override val clearRequestServlet = new ExplodingClearServlet
/** A faulty servlet that produces malformed responses. */
class MalformedSubmitServlet
@@ -660,4 +682,15 @@ private class FaultyStandaloneRestServer(
s
}
}
+
+ /** A faulty clear servlet that explodes. */
+ class ExplodingClearServlet extends
StandaloneClearRequestServlet(masterEndpoint, masterConf) {
+ private def explode: Int = 1 / 0
+
+ protected override def handleClear(): ClearResponse = {
+ val s = super.handleClear()
+ s.message = explode.toString
+ s
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]