Repository: spark
Updated Branches:
  refs/heads/master b8a08f25c -> 23af2d79a


[SPARK-20025][CORE] Ignore  SPARK_LOCAL* env, while deploying via cluster mode.

## What changes were proposed in this pull request?

In a bare metal system with No DNS setup, spark may be configured with 
SPARK_LOCAL* for IP and host properties.
During a driver failover, in cluster deployment mode. SPARK_LOCAL* should be 
ignored while restarting on another node and should be picked up from target 
system's local environment.

## How was this patch tested?
Distributed deployment against a spark standalone cluster of  6 Workers. Tested 
by killing JVM's running driver and verified the restarted JVMs have right 
configurations on them.

Author: Prashant Sharma <[email protected]>
Author: Prashant Sharma <[email protected]>

Closes #17357 from ScrapCodes/driver-failover-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23af2d79
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23af2d79
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23af2d79

Branch: refs/heads/master
Commit: 23af2d79ad9a3c83936485ee57513b39193a446b
Parents: b8a08f2
Author: Prashant Sharma <[email protected]>
Authored: Tue Oct 10 20:48:42 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Tue Oct 10 20:48:42 2017 +0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/deploy/Client.scala    | 6 +++---
 .../org/apache/spark/deploy/rest/StandaloneRestServer.scala | 4 +++-
 .../org/apache/spark/deploy/worker/DriverWrapper.scala      | 9 ++++++---
 3 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23af2d79/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index bf60932..7acb5c5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -93,19 +93,19 @@ private class ClientEndpoint(
           driverArgs.cores,
           driverArgs.supervise,
           command)
-        ayncSendToMasterAndForwardReply[SubmitDriverResponse](
+        asyncSendToMasterAndForwardReply[SubmitDriverResponse](
           RequestSubmitDriver(driverDescription))
 
       case "kill" =>
         val driverId = driverArgs.driverId
-        
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
+        
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
     }
   }
 
   /**
    * Send the message to master and forward the reply to self asynchronously.
    */
-  private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit 
= {
+  private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): 
Unit = {
     for (masterEndpoint <- masterEndpoints) {
       masterEndpoint.ask[T](message).onComplete {
         case Success(v) => self.send(v)

http://git-wip-us.apache.org/repos/asf/spark/blob/23af2d79/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 0164084..22b65ab 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -139,7 +139,9 @@ private[rest] class StandaloneSubmitRequestServlet(
     val driverExtraLibraryPath = 
sparkProperties.get("spark.driver.extraLibraryPath")
     val superviseDriver = sparkProperties.get("spark.driver.supervise")
     val appArgs = request.appArgs
-    val environmentVariables = request.environmentVariables
+    // Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set 
on the remote system.
+    val environmentVariables =
+      request.environmentVariables.filterNot(x => 
x._1.matches("SPARK_LOCAL_(IP|HOSTNAME)"))
 
     // Construct driver description
     val conf = new SparkConf(false)

http://git-wip-us.apache.org/repos/asf/spark/blob/23af2d79/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index c167119..b19c990 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
+import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, 
Utils}
 
@@ -30,7 +31,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, 
MutableURLClassLoader, U
  * Utility object for launching driver programs such that they share fate with 
the Worker process.
  * This is used in standalone cluster mode only.
  */
-object DriverWrapper {
+object DriverWrapper extends Logging {
   def main(args: Array[String]) {
     args.toList match {
       /*
@@ -41,8 +42,10 @@ object DriverWrapper {
        */
       case workerUrl :: userJar :: mainClass :: extraArgs =>
         val conf = new SparkConf()
-        val rpcEnv = RpcEnv.create("Driver",
-          Utils.localHostName(), 0, conf, new SecurityManager(conf))
+        val host: String = Utils.localHostName()
+        val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt
+        val rpcEnv = RpcEnv.create("Driver", host, port, conf, new 
SecurityManager(conf))
+        logInfo(s"Driver address: ${rpcEnv.address}")
         rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, 
workerUrl))
 
         val currentLoader = Thread.currentThread.getContextClassLoader


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to