This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89c71f7c195d [SPARK-46368][CORE] Support `readyz` in REST Submission 
API
89c71f7c195d is described below

commit 89c71f7c195d86d489e45efc87688e0757a2158a
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Jan 11 11:09:31 2024 -0800

    [SPARK-46368][CORE] Support `readyz` in REST Submission API
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `readyz` in REST Submission API as a meaning of 
`state != RecoveryState.STANDBY`.
    
    `readyz` semantic is defined by K8s documentation.
    - https://kubernetes.io/docs/reference/using-api/health-checks/
    
    > Machines that check the healthz/livez/readyz of the API server should 
rely on the HTTP status code.
    > A status code 200 indicates the API server is healthy/live/ready, 
depending on the called endpoint.
    
    ### Why are the changes needed?
    
    Previously, the users can check the Spark Master pods' readiness like the 
following.
    ```
    readinessProbe:
      exec:
        command: ["sh", "-c", "! (curl -s 
http://localhost:6066/v1/submissions/status/none | grep -q STANDBY)"]
    ```
    
    After this PR, we have the following result.
    
    **200 OK**
    ```
    $ curl -vv http://localhost:6066/v1/submissions/readyz
    < HTTP/1.1 200 OK
    {
      "action" : "ReadyzResponse",
      "message" : "",
      "serverSparkVersion" : "4.0.0-SNAPSHOT",
      "success" : true
    }
    ```
    
    **503 Service Unavailable**
    ```
    $ curl -vv http://localhost:6066/v1/submissions/readyz
    ...
    < HTTP/1.1 503 Service Unavailable
    ...
    {
      "action" : "ErrorResponse",
      "message" : "Master is not ready.",
      "serverSparkVersion" : "4.0.0-SNAPSHOT"
    }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a new API.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44692 from dongjoon-hyun/SPARK-HEALTHZ.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/deploy/DeployMessage.scala    |  2 +
 .../org/apache/spark/deploy/master/Master.scala    |  3 ++
 .../spark/deploy/rest/RestSubmissionClient.scala   | 35 ++++++++++++++++
 .../spark/deploy/rest/RestSubmissionServer.scala   | 30 ++++++++++++-
 .../spark/deploy/rest/StandaloneRestServer.scala   | 18 ++++++++
 .../deploy/rest/SubmitRestProtocolResponse.scala   | 10 +++++
 .../deploy/rest/StandaloneRestSubmitSuite.scala    | 49 +++++++++++++++++++++-
 7 files changed, 145 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 4ccc0bd7cdc2..cb5996a5097d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -246,6 +246,8 @@ private[deploy] object DeployMessages {
 
   case object RequestClearCompletedDriversAndApps extends DeployMessage
 
+  case object RequestReadyz extends DeployMessage
+
   // Internal message in AppClient
 
   case object StopAppClient
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 696ce05bce48..a1dad0428a6e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -509,6 +509,9 @@ private[deploy] class Master(
         workers.toArray, apps.toArray, completedApps.toArray,
         drivers.toArray, completedDrivers.toArray, state))
 
+    case RequestReadyz =>
+      context.reply(state != RecoveryState.STANDBY)
+
     case BoundPortsRequest =>
       context.reply(BoundPortsResponse(address.port, webUi.boundPort, 
restServerBoundPort))
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index f1400a0c6a74..ea05b042bb12 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -193,6 +193,35 @@ private[spark] class RestSubmissionClient(master: String) 
extends Logging {
     response
   }
 
+  /** Check the readiness of Master. */
+  def readyz(): SubmitRestProtocolResponse = {
+    logInfo(s"Submitting a request to check the status of $master.")
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = new ErrorResponse
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getReadyzUrl(m)
+      try {
+        response = get(url)
+        response match {
+          case k: ReadyzResponse =>
+            if (!Utils.responseFromBackup(k.message)) {
+              handleRestResponse(k)
+              handled = true
+            }
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to 
server", e)
+          }
+      }
+    }
+    response
+  }
+
   /** Request the status of a submission from the server. */
   def requestSubmissionStatus(
       submissionId: String,
@@ -370,6 +399,12 @@ private[spark] class RestSubmissionClient(master: String) 
extends Logging {
     new URL(s"$baseUrl/clear")
   }
 
+  /** Return the REST URL for requesting the readyz API. */
+  private def getReadyzUrl(master: String): URL = {
+    val baseUrl = getBaseUrl(master)
+    new URL(s"$baseUrl/readyz")
+  }
+
   /** Return the REST URL for requesting the status of an existing submission. 
*/
   private def getStatusUrl(master: String, submissionId: String): URL = {
     val baseUrl = getBaseUrl(master)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
index 28197fd0a556..4e871393f0be 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala
@@ -57,6 +57,7 @@ private[spark] abstract class RestSubmissionServer(
   protected val killAllRequestServlet: KillAllRequestServlet
   protected val statusRequestServlet: StatusRequestServlet
   protected val clearRequestServlet: ClearRequestServlet
+  protected val readyzRequestServlet: ReadyzRequestServlet
 
   private var _server: Option[Server] = None
 
@@ -68,6 +69,7 @@ private[spark] abstract class RestSubmissionServer(
     s"$baseContext/killall/*" -> killAllRequestServlet,
     s"$baseContext/status/*" -> statusRequestServlet,
     s"$baseContext/clear/*" -> clearRequestServlet,
+    s"$baseContext/readyz/*" -> readyzRequestServlet,
     "/*" -> new ErrorServlet // default handler
   )
 
@@ -268,6 +270,31 @@ private[rest] abstract class ClearRequestServlet extends 
RestServlet {
   protected def handleClear(): ClearResponse
 }
 
+/**
+ * A servlet for handling readyz requests passed to the 
[[RestSubmissionServer]].
+ */
+private[rest] abstract class ReadyzRequestServlet extends RestServlet {
+
+  /**
+   * Return the status of master is ready or not.
+   */
+  protected override def doGet(
+      request: HttpServletRequest,
+      response: HttpServletResponse): Unit = {
+    val readyzResponse = handleReadyz()
+    val responseMessage = if (readyzResponse.success) {
+      response.setStatus(HttpServletResponse.SC_OK)
+      readyzResponse
+    } else {
+      response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE)
+      handleError("Master is not ready.")
+    }
+    sendResponse(responseMessage, response)
+  }
+
+  protected def handleReadyz(): ReadyzResponse
+}
+
 /**
  * A servlet for handling status requests passed to the 
[[RestSubmissionServer]].
  */
@@ -352,7 +379,8 @@ private class ErrorServlet extends RestServlet {
           "Missing the /submissions prefix."
         case `serverVersion` :: "submissions" :: tail =>
           // http://host:port/correct-version/submissions/*
-          "Missing an action: please specify one of /create, /kill, /killall, 
/clear or /status."
+          "Missing an action: please specify one of /create, /kill, /killall, 
/clear, /status, " +
+            "or /readyz."
         case unknownVersion :: tail =>
           // http://host:port/unknown-version/*
           versionMismatch = true
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index fa1ac80d25d5..e53d3ecefeda 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -70,6 +70,8 @@ private[deploy] class StandaloneRestServer(
     new StandaloneStatusRequestServlet(masterEndpoint, masterConf)
   protected override val clearRequestServlet =
     new StandaloneClearRequestServlet(masterEndpoint, masterConf)
+  protected override val readyzRequestServlet =
+    new StandaloneReadyzRequestServlet(masterEndpoint, masterConf)
 }
 
 /**
@@ -146,6 +148,22 @@ private[rest] class 
StandaloneClearRequestServlet(masterEndpoint: RpcEndpointRef
   }
 }
 
+/**
+ * A servlet for handling readyz requests passed to the 
[[StandaloneRestServer]].
+ */
+private[rest] class StandaloneReadyzRequestServlet(masterEndpoint: 
RpcEndpointRef, conf: SparkConf)
+  extends ReadyzRequestServlet {
+
+  protected def handleReadyz(): ReadyzResponse = {
+    val success = masterEndpoint.askSync[Boolean](DeployMessages.RequestReadyz)
+    val r = new ReadyzResponse
+    r.serverSparkVersion = sparkVersion
+    r.message = ""
+    r.success = success
+    r
+  }
+}
+
 /**
  * A servlet for handling submit requests passed to the 
[[StandaloneRestServer]].
  */
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
index b9e3b3028ac7..4c484419d3be 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolResponse.scala
@@ -75,6 +75,16 @@ private[spark] class ClearResponse extends 
SubmitRestProtocolResponse {
   }
 }
 
+/**
+ * A response to a readyz request in the REST application submission protocol.
+ */
+private[spark] class ReadyzResponse extends SubmitRestProtocolResponse {
+  protected override def doValidate(): Unit = {
+    super.doValidate()
+    assertFieldIsSet(success, "success")
+  }
+}
+
 /**
  * A response to a status request in the REST application submission protocol.
  */
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index abe05a805584..04281dcd0b43 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.{SparkSubmit, SparkSubmitArguments}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.DriverState._
+import org.apache.spark.deploy.master.RecoveryState
 import org.apache.spark.rpc._
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
@@ -246,6 +247,24 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
     assert(killAllResponse.success)
   }
 
+  test("SPARK-46368: readyz with SC_OK") {
+    val masterUrl = startDummyServer()
+    val response = new RestSubmissionClient(masterUrl).readyz()
+    val readyzResponse = getReadyzResponse(response)
+    assert(readyzResponse.action === 
Utils.getFormattedClassName(readyzResponse))
+    assert(readyzResponse.success)
+    assert(readyzResponse.message.isBlank)
+  }
+
+  test("SPARK-46368: readyz with SC_SERVICE_UNAVAILABLE") {
+    val masterUrl = startDummyServer(recoveryState = RecoveryState.STANDBY)
+    val response = new RestSubmissionClient(masterUrl).readyz()
+    val errorResponse = getReadyzResponse(response)
+    assert(errorResponse.action === Utils.getFormattedClassName(errorResponse))
+    assert(errorResponse.success == null)
+    assert(errorResponse.message.contains("Master is not ready."))
+  }
+
   /* ---------------------------------------- *
    |     Aberrant client / server behavior    |
    * ---------------------------------------- */
@@ -448,8 +467,10 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
       submitMessage: String = "driver is submitted",
       killMessage: String = "driver is killed",
       state: DriverState = FINISHED,
+      recoveryState: RecoveryState.Value = RecoveryState.ALIVE,
       exception: Option[Exception] = None): String = {
-    startServer(new DummyMaster(_, submitId, submitMessage, killMessage, 
state, exception))
+    startServer(new DummyMaster(_, submitId, submitMessage, killMessage, 
state, recoveryState,
+      exception))
   }
 
   /** Start a smarter dummy server that keeps track of submitted driver 
states. */
@@ -543,6 +564,16 @@ class StandaloneRestSubmitSuite extends SparkFunSuite {
     }
   }
 
+  /** Return the response as a readyz response, or fail with error otherwise. 
*/
+  private def getReadyzResponse(response: SubmitRestProtocolResponse)
+    : SubmitRestProtocolResponse = {
+    response match {
+      case k: ReadyzResponse => k
+      case e: ErrorResponse => e // This is a valid response for readyz
+      case r => fail(s"Expected readyz response. Actual: ${r.toJson}")
+    }
+  }
+
   /** Return the response as a status response, or fail with error otherwise. 
*/
   private def getStatusResponse(response: SubmitRestProtocolResponse): 
SubmissionStatusResponse = {
     response match {
@@ -602,6 +633,7 @@ private class DummyMaster(
     submitMessage: String = "submitted",
     killMessage: String = "killed",
     state: DriverState = FINISHED,
+    recoveryState: RecoveryState.Value = RecoveryState.ALIVE,
     exception: Option[Exception] = None)
   extends RpcEndpoint {
 
@@ -616,6 +648,8 @@ private class DummyMaster(
       context.reply(DriverStatusResponse(found = true, Some(state), None, 
None, exception))
     case RequestClearCompletedDriversAndApps =>
       context.reply(true)
+    case RequestReadyz =>
+      context.reply(recoveryState != RecoveryState.STANDBY)
   }
 }
 
@@ -661,6 +695,7 @@ private class SmarterMaster(override val rpcEnv: RpcEnv) 
extends ThreadSafeRpcEn
  * When handling a killAll request, the server returns an invalid JSON.
  * When handling a status request, the server throws an internal exception.
  * When handling a clear request, the server throws an internal exception.
+ * When handling a readyz request, the server throws an internal exception.
  * The purpose of this class is to test that client handles these cases 
gracefully.
  */
 private class FaultyStandaloneRestServer(
@@ -676,6 +711,7 @@ private class FaultyStandaloneRestServer(
   protected override val killAllRequestServlet = new InvalidKillAllServlet
   protected override val statusRequestServlet = new ExplodingStatusServlet
   protected override val clearRequestServlet = new ExplodingClearServlet
+  protected override val readyzRequestServlet = new ExplodingReadyzServlet
 
   /** A faulty servlet that produces malformed responses. */
   class MalformedSubmitServlet
@@ -725,4 +761,15 @@ private class FaultyStandaloneRestServer(
       s
     }
   }
+
+  /** A faulty readyz servlet that explodes. */
+  class ExplodingReadyzServlet extends 
StandaloneReadyzRequestServlet(masterEndpoint, masterConf) {
+    private def explode: Int = 1 / 0
+
+    protected override def handleReadyz(): ReadyzResponse = {
+      val s = super.handleReadyz()
+      explode.toString
+      s
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to