This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 89c71f7c195d [SPARK-46368][CORE] Support `readyz` in REST Submission
API
89c71f7c195d is described below
commit 89c71f7c195d86d489e45efc87688e0757a2158a
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Jan 11 11:09:31 2024 -0800
[SPARK-46368][CORE] Support `readyz` in REST Submission API
### What changes were proposed in this pull request?
This PR aims to support `readyz` in REST Submission API as a meaning of
`state != RecoveryState.STANDBY`.
`readyz` semantic is defined by K8s documentation.
- https://kubernetes.io/docs/reference/using-api/health-checks/
> Machines that check the healthz/livez/readyz of the API server should
rely on the HTTP status code.
> A status code 200 indicates the API server is healthy/live/ready,
depending on the called endpoint.
### Why are the changes needed?
Previously, the users can check the Spark Master pods' readiness like the
following.
```
readinessProbe:
exec:
command: ["sh", "-c", "! (curl -s
http://localhost:6066/v1/submissions/status/none | grep -q STANDBY)"]
```
After this PR, we have the following result.
**200 OK**
```
$ curl -vv http://localhost:6066/v1/submissions/readyz
< HTTP/1.1 200 OK
{
"action" : "ReadyzResponse",
"message" : "",
"serverSparkVersion" : "4.0.0-SNAPSHOT",
"success" : true
}
```
**503 Service Unavailable**
```
$ curl -vv http://localhost:6066/v1/submissions/readyz
...
< HTTP/1.1 503 Service Unavailable
...
{
"action" : "ErrorResponse",
"message" : "Master is not ready.",
"serverSparkVersion" : "4.0.0-SNAPSHOT"
}
```
### Does this PR introduce _any_ user-facing change?
No, this is a new API.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44692 from dongjoon-hyun/SPARK-HEALTHZ.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/DeployMessage.scala | 2 +
.../org/apache/spark/deploy/master/Master.scala | 3 ++
.../spark/deploy/rest/RestSubmissionClient.scala | 35 ++++++++++++++++
.../spark/deploy/rest/RestSubmissionServer.scala | 30 ++++++++++++-
.../spark/deploy/rest/StandaloneRestServer.scala | 18 ++++++++
.../deploy/rest/SubmitRestProtocolResponse.scala | 10 +++++
.../deploy/rest/StandaloneRestSubmitSuite.scala | 49 +++++++++++++++++++++-
7 files changed, 145 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 4ccc0bd7cdc2..cb5996a5097d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -246,6 +246,8 @@ private[deploy] object DeployMessages {
case object RequestClearCompletedDriversAndApps extends DeployMessage
+ case object RequestReadyz extends DeployMessage
+
// Internal message in AppClient
case object StopAppClient
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 696ce05bce48..a1dad0428a6e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -509,6 +509,9 @@ private[deploy] class Master(
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state))
+ case RequestReadyz =>
+ context.reply(state != RecoveryState.STANDBY)
+
case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort,
restServerBoundPort))
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index f1400a0c6a74..ea05b042bb12 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -193,6 +193,35 @@ private[spark] class RestSubmissionClient(master: String)
extends Logging {
response
}
+ /** Check the readiness of Master. */
+ def readyz(): SubmitRestProtocolResponse = {
+ logInfo(s"Submitting a request to check the status of $master.")
+ var handled: Boolean = false
+ var response: SubmitRestProtocolResponse = new ErrorResponse
+ for (m <- masters if !handled) {
+ validateMaster(m)
+ val url = getReadyzUrl(m)
+ try {
+ response = get(url)
+ response match {
+ case k: ReadyzResponse =>
+ if (!Utils.responseFromBackup(k.message)) {
+ handleRestResponse(k)
+ handled = true
+ }
+ case unexpected =>
+ handleUnexpectedRestResponse(unexpected)
+ }
+ } catch {
+ case e: SubmitRestConnectionException =>
+ if (handleConnectionException(m)) {
+ throw new SubmitRestConnectionException("Unable to connect to
server", e)
+ }
+ }
+ }
+ response
+ }
+
/** Request the status of a submission from the server. */
def requestSubmissionStatus(
submissionId: String,
@@ -370,6 +399,12 @@ private[spark] class RestSubmissionClient(master: String)
extends Logging {
new URL(s"$baseUrl/clear")
}
+ /** Return the REST URL for requesting the readyz API. */
+ private def getReadyzUrl(master: String): URL = {
+ val baseUrl = getBaseUrl(master)
+ new URL(s"$baseUrl/readyz")
+ }
+
/** Return the REST URL for requesting the status of an existing submission.
*/
private def getStatusUrl(master: String, submissionId: String): URL = {
val baseUrl = getBaseUrl(master)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index 28197fd0a556..4e871393f0be 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -57,6 +57,7 @@ private[spark] abstract class RestSubmissionServer(
protected val killAllRequestServlet: KillAllRequestServlet
protected val statusRequestServlet: StatusRequestServlet
protected val clearRequestServlet: ClearRequestServlet
+ protected val readyzRequestServlet: ReadyzRequestServlet
private var _server: Option[Server] = None
@@ -68,6 +69,7 @@ private[spark] abstract class RestSubmissionServer(
s"$baseContext/killall/*" -> killAllRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
s"$baseContext/clear/*" -> clearRequestServlet,
+ s"$baseContext/readyz/*" -> readyzRequestServlet,
"/*" -> new ErrorServlet // default handler
)
@@ -268,6 +270,31 @@ private[rest] abstract class ClearRequestServlet extends
RestServlet {
protected def handleClear(): ClearResponse
}
+/**
+ * A servlet for handling readyz requests passed to the
[[RestSubmissionServer]].
+ */
+private[rest] abstract class ReadyzRequestServlet extends RestServlet {
+
+ /**
+ * Return the status of master is ready or not.
+ */
+ protected override def doGet(
+ request: HttpServletRequest,
+ response: HttpServletResponse): Unit = {
+ val readyzResponse = handleReadyz()
+ val responseMessage = if (readyzResponse.success) {
+ response.setStatus(HttpServletResponse.SC_OK)
+ readyzResponse
+ } else {
+ response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE)
+ handleError("Master is not ready.")
+ }
+ sendResponse(responseMessage, response)
+ }
+
+ protected def handleReadyz(): ReadyzResponse
+}
+
/**
* A servlet for handling status requests passed to the
[[RestSubmissionServer]].
*/
@@ -352,7 +379,8 @@ private class ErrorServlet extends RestServlet {
"Missing the /submissions prefix."
case `serverVersion` :: "submissions" :: tail =>
// http://host:port/correct-version/submissions/*
- "Missing an action: please specify one of /create, /kill, /killall,
/clear or /status."
+ "Missing an action: please specify one of /create, /kill, /killall,
/clear, /status, " +
+ "or /readyz."
case unknownVersion :: tail =>
// http://host:port/unknown-version/*
versionMismatch = true
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index fa1ac80d25d5..e53d3ecefeda 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -70,6 +70,8 @@ private[deploy] class StandaloneRestServer(
new StandaloneStatusRequestServlet(masterEndpoint, masterConf)
protected override val clearRequestServlet =
new StandaloneClearRequestServlet(masterEndpoint, masterConf)
+ protected override val readyzRequestServlet =
+ new StandaloneReadyzRequestServlet(masterEndpoint, masterConf)
}
/**
@@ -146,6 +148,22 @@ private[rest] class
StandaloneClearRequestServlet(masterEndpoint: RpcEndpointRef
}
}
+/**
+ * A servlet for handling readyz requests passed to the
[[StandaloneRestServer]].
+ */
+private[rest] class StandaloneReadyzRequestServlet(masterEndpoint:
RpcEndpointRef, conf: SparkConf)
+ extends ReadyzRequestServlet {
+
+ protected def handleReadyz(): ReadyzResponse = {
+ val success = masterEndpoint.askSync[Boolean](DeployMessages.RequestReadyz)
+ val r = new ReadyzResponse
+ r.serverSparkVersion = sparkVersion
+ r.message = ""
+ r.success = success
+ r
+ }
+}
+
/**
* A servlet for handling submit requests passed to the
[[StandaloneRestServer]].
*/
diff --git
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
index b9e3b3028ac7..4c484419d3be 100644
---
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -75,6 +75,16 @@ private[spark] class ClearResponse extends
SubmitRestProtocolResponse {
}
}
+/**
+ * A response to a readyz request in the REST application submission protocol.
+ */
+private[spark] class ReadyzResponse extends SubmitRestProtocolResponse {
+ protected override def doValidate(): Unit = {
+ super.doValidate()
+ assertFieldIsSet(success, "success")
+ }
+}
+
/**
* A response to a status request in the REST application submission protocol.
*/
diff --git
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index abe05a805584..04281dcd0b43 100644
---
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.deploy.{SparkSubmit, SparkSubmitArguments}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.DriverState._
+import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.rpc._
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
@@ -246,6 +247,24 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
assert(killAllResponse.success)
}
+ test("SPARK-46368: readyz with SC_OK") {
+ val masterUrl = startDummyServer()
+ val response = new RestSubmissionClient(masterUrl).readyz()
+ val readyzResponse = getReadyzResponse(response)
+ assert(readyzResponse.action ===
Utils.getFormattedClassName(readyzResponse))
+ assert(readyzResponse.success)
+ assert(readyzResponse.message.isBlank)
+ }
+
+ test("SPARK-46368: readyz with SC_SERVICE_UNAVAILABLE") {
+ val masterUrl = startDummyServer(recoveryState = RecoveryState.STANDBY)
+ val response = new RestSubmissionClient(masterUrl).readyz()
+ val errorResponse = getReadyzResponse(response)
+ assert(errorResponse.action === Utils.getFormattedClassName(errorResponse))
+ assert(errorResponse.success == null)
+ assert(errorResponse.message.contains("Master is not ready."))
+ }
+
/* ---------------------------------------- *
| Aberrant client / server behavior |
* ---------------------------------------- */
@@ -448,8 +467,10 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
submitMessage: String = "driver is submitted",
killMessage: String = "driver is killed",
state: DriverState = FINISHED,
+ recoveryState: RecoveryState.Value = RecoveryState.ALIVE,
exception: Option[Exception] = None): String = {
- startServer(new DummyMaster(_, submitId, submitMessage, killMessage,
state, exception))
+ startServer(new DummyMaster(_, submitId, submitMessage, killMessage,
state, recoveryState,
+ exception))
}
/** Start a smarter dummy server that keeps track of submitted driver
states. */
@@ -543,6 +564,16 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
}
}
+ /** Return the response as a readyz response, or fail with error otherwise.
*/
+ private def getReadyzResponse(response: SubmitRestProtocolResponse)
+ : SubmitRestProtocolResponse = {
+ response match {
+ case k: ReadyzResponse => k
+ case e: ErrorResponse => e // This is a valid response for readyz
+ case r => fail(s"Expected readyz response. Actual: ${r.toJson}")
+ }
+ }
+
/** Return the response as a status response, or fail with error otherwise.
*/
private def getStatusResponse(response: SubmitRestProtocolResponse):
SubmissionStatusResponse = {
response match {
@@ -602,6 +633,7 @@ private class DummyMaster(
submitMessage: String = "submitted",
killMessage: String = "killed",
state: DriverState = FINISHED,
+ recoveryState: RecoveryState.Value = RecoveryState.ALIVE,
exception: Option[Exception] = None)
extends RpcEndpoint {
@@ -616,6 +648,8 @@ private class DummyMaster(
context.reply(DriverStatusResponse(found = true, Some(state), None,
None, exception))
case RequestClearCompletedDriversAndApps =>
context.reply(true)
+ case RequestReadyz =>
+ context.reply(recoveryState != RecoveryState.STANDBY)
}
}
@@ -661,6 +695,7 @@ private class SmarterMaster(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEn
* When handling a killAll request, the server returns an invalid JSON.
* When handling a status request, the server throws an internal exception.
* When handling a clear request, the server throws an internal exception.
+ * When handling a readyz request, the server throws an internal exception.
* The purpose of this class is to test that client handles these cases
gracefully.
*/
private class FaultyStandaloneRestServer(
@@ -676,6 +711,7 @@ private class FaultyStandaloneRestServer(
protected override val killAllRequestServlet = new InvalidKillAllServlet
protected override val statusRequestServlet = new ExplodingStatusServlet
protected override val clearRequestServlet = new ExplodingClearServlet
+ protected override val readyzRequestServlet = new ExplodingReadyzServlet
/** A faulty servlet that produces malformed responses. */
class MalformedSubmitServlet
@@ -725,4 +761,15 @@ private class FaultyStandaloneRestServer(
s
}
}
+
+ /** A faulty readyz servlet that explodes. */
+ class ExplodingReadyzServlet extends
StandaloneReadyzRequestServlet(masterEndpoint, masterConf) {
+ private def explode: Int = 1 / 0
+
+ protected override def handleReadyz(): ReadyzResponse = {
+ val s = super.handleReadyz()
+ explode.toString
+ s
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]