Repository: spark
Updated Branches:
  refs/heads/master 65c653fb4 -> 002f9c169


[SPARK-24794][CORE] Driver launched through rest should use all masters

## What changes were proposed in this pull request?

In standalone cluster mode, one could launch driver with supervise mode
enabled. StandaloneRestServer class uses the host and port of current
master as the spark.master property while launching the driver
(even if you are running in HA mode). This class also ignores the
spark.master property passed as part of the request.

Due to the above problem, if the Spark masters switch due to some reason
and your driver is killed unexpectedly and relaunched, it will try to
connect to the master which is in the driver command specified as
-Dspark.master. But this master will be in STANDBY mode and after trying
multiple times, the SparkContext will kill itself (even though secondary
master was alive and healthy).

This change picks the spark.master property from request and uses it to
launch the driver process. Due to this, the driver process has both
masters in -Dspark.master property. Even if the masters switch, SparkContext
can still connect to the ALIVE master and work correctly.

## How was this patch tested?
This patch was manually tested on a standalone cluster running 2.2.1. It was 
rebased on current master and all tests were executed. I have added a unit test 
for this change (but since I am new I hope I have covered all).

Closes #21816 from bsikander/rest_driver_fix.

Authored-by: Behroz Sikander <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/002f9c16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/002f9c16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/002f9c16

Branch: refs/heads/master
Commit: 002f9c169eb20b0d71b6d0296595f343c7f5bab2
Parents: 65c653f
Author: Behroz Sikander <[email protected]>
Authored: Thu Oct 25 08:36:44 2018 -0500
Committer: Sean Owen <[email protected]>
Committed: Thu Oct 25 08:36:44 2018 -0500

----------------------------------------------------------------------
 .../deploy/rest/StandaloneRestServer.scala      | 12 +++++++++++-
 .../deploy/rest/StandaloneRestSubmitSuite.scala | 20 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/002f9c16/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 22b65ab..afa1a5f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -138,6 +138,16 @@ private[rest] class StandaloneSubmitRequestServlet(
     val driverExtraClassPath = 
sparkProperties.get("spark.driver.extraClassPath")
     val driverExtraLibraryPath = 
sparkProperties.get("spark.driver.extraLibraryPath")
     val superviseDriver = sparkProperties.get("spark.driver.supervise")
+    // The semantics of "spark.master" and the masterUrl are different. While 
the
+    // property "spark.master" could contain all registered masters, masterUrl
+    // contains only the active master. To make sure a Spark driver can recover
+    // in a multi-master setup, we use the "spark.master" property while 
submitting
+    // the driver.
+    val masters = sparkProperties.get("spark.master")
+    val (_, masterPort) = Utils.extractHostPortFromSparkUrl(masterUrl)
+    val masterRestPort = this.conf.getInt("spark.master.rest.port", 6066)
+    val updatedMasters = masters.map(
+      _.replace(s":$masterRestPort", s":$masterPort")).getOrElse(masterUrl)
     val appArgs = request.appArgs
     // Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set 
on the remote system.
     val environmentVariables =
@@ -146,7 +156,7 @@ private[rest] class StandaloneSubmitRequestServlet(
     // Construct driver description
     val conf = new SparkConf(false)
       .setAll(sparkProperties)
-      .set("spark.master", masterUrl)
+      .set("spark.master", updatedMasters)
     val extraClassPath = 
driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
     val extraLibraryPath = 
driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
     val extraJavaOpts = 
driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/002f9c16/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 54c168a..4839c84 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -83,6 +83,26 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(submitResponse.success)
   }
 
+  test("create submission with multiple masters") {
+    val submittedDriverId = "your-driver-id"
+    val submitMessage = "my driver is submitted"
+    val masterUrl = startDummyServer(submitId = submittedDriverId, 
submitMessage = submitMessage)
+    val conf = new SparkConf(loadDefaults = false)
+    val RANDOM_PORT = 9000
+    val allMasters = s"$masterUrl,${Utils.localHostName()}:$RANDOM_PORT"
+    conf.set("spark.master", allMasters)
+    conf.set("spark.app.name", "dreamer")
+    val appArgs = Array("one", "two", "six")
+    // main method calls this
+    val response = new RestSubmissionClientApp().run("app-resource", 
"main-class", appArgs, conf)
+    val submitResponse = getSubmitResponse(response)
+    assert(submitResponse.action === 
Utils.getFormattedClassName(submitResponse))
+    assert(submitResponse.serverSparkVersion === SPARK_VERSION)
+    assert(submitResponse.message === submitMessage)
+    assert(submitResponse.submissionId === submittedDriverId)
+    assert(submitResponse.success)
+  }
+
   test("create submission from main method") {
     val submittedDriverId = "your-driver-id"
     val submitMessage = "my driver is submitted"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to