Repository: spark Updated Branches: refs/heads/master 65c653fb4 -> 002f9c169
[SPARK-24794][CORE] Driver launched through rest should use all masters ## What changes were proposed in this pull request? In standalone cluster mode, one could launch driver with supervise mode enabled. StandaloneRestServer class uses the host and port of current master as the spark.master property while launching the driver (even if you are running in HA mode). This class also ignores the spark.master property passed as part of the request. Due to the above problem, if the Spark masters switch due to some reason and your driver is killed unexpectedly and relaunched, it will try to connect to the master which is in the driver command specified as -Dspark.master. But this master will be in STANDBY mode and after trying multiple times, the SparkContext will kill itself (even though secondary master was alive and healthy). This change picks the spark.master property from request and uses it to launch the driver process. Due to this, the driver process has both masters in -Dspark.master property. Even if the masters switch, SparkContext can still connect to the ALIVE master and work correctly. ## How was this patch tested? This patch was manually tested on a standalone cluster running 2.2.1. It was rebased on current master and all tests were executed. I have added a unit test for this change (but since I am new I hope I have covered all). Closes #21816 from bsikander/rest_driver_fix. Authored-by: Behroz Sikander <[email protected]> Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/002f9c16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/002f9c16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/002f9c16 Branch: refs/heads/master Commit: 002f9c169eb20b0d71b6d0296595f343c7f5bab2 Parents: 65c653f Author: Behroz Sikander <[email protected]> Authored: Thu Oct 25 08:36:44 2018 -0500 Committer: Sean Owen <[email protected]> Committed: Thu Oct 25 08:36:44 2018 -0500 ---------------------------------------------------------------------- .../deploy/rest/StandaloneRestServer.scala | 12 +++++++++++- .../deploy/rest/StandaloneRestSubmitSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/002f9c16/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 22b65ab..afa1a5f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -138,6 +138,16 @@ private[rest] class StandaloneSubmitRequestServlet( val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") + // The semantics of "spark.master" and the masterUrl are different. While the + // property "spark.master" could contain all registered masters, masterUrl + // contains only the active master. To make sure a Spark driver can recover + // in a multi-master setup, we use the "spark.master" property while submitting + // the driver. + val masters = sparkProperties.get("spark.master") + val (_, masterPort) = Utils.extractHostPortFromSparkUrl(masterUrl) + val masterRestPort = this.conf.getInt("spark.master.rest.port", 6066) + val updatedMasters = masters.map( + _.replace(s":$masterRestPort", s":$masterPort")).getOrElse(masterUrl) val appArgs = request.appArgs // Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set on the remote system. val environmentVariables = @@ -146,7 +156,7 @@ private[rest] class StandaloneSubmitRequestServlet( // Construct driver description val conf = new SparkConf(false) .setAll(sparkProperties) - .set("spark.master", masterUrl) + .set("spark.master", updatedMasters) val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) http://git-wip-us.apache.org/repos/asf/spark/blob/002f9c16/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 54c168a..4839c84 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -83,6 +83,26 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { assert(submitResponse.success) } + test("create submission with multiple masters") { + val submittedDriverId = "your-driver-id" + val submitMessage = "my driver is submitted" + val masterUrl = startDummyServer(submitId = submittedDriverId, submitMessage = submitMessage) + val conf = new SparkConf(loadDefaults = false) + val RANDOM_PORT = 9000 + val allMasters = s"$masterUrl,${Utils.localHostName()}:$RANDOM_PORT" + conf.set("spark.master", allMasters) + conf.set("spark.app.name", "dreamer") + val appArgs = Array("one", "two", "six") + // main method calls this + val response = new RestSubmissionClientApp().run("app-resource", "main-class", appArgs, conf) + val submitResponse = getSubmitResponse(response) + assert(submitResponse.action === Utils.getFormattedClassName(submitResponse)) + assert(submitResponse.serverSparkVersion === SPARK_VERSION) + assert(submitResponse.message === submitMessage) + assert(submitResponse.submissionId === submittedDriverId) + assert(submitResponse.success) + } + test("create submission from main method") { val submittedDriverId = "your-driver-id" val submitMessage = "my driver is submitted" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
