Repository: spark Updated Branches: refs/heads/master b8a08f25c -> 23af2d79a
[SPARK-20025][CORE] Ignore SPARK_LOCAL* env, while deploying via cluster mode. ## What changes were proposed in this pull request? In a bare metal system with No DNS setup, spark may be configured with SPARK_LOCAL* for IP and host properties. During a driver failover, in cluster deployment mode. SPARK_LOCAL* should be ignored while restarting on another node and should be picked up from target system's local environment. ## How was this patch tested? Distributed deployment against a spark standalone cluster of 6 Workers. Tested by killing JVM's running driver and verified the restarted JVMs have right configurations on them. Author: Prashant Sharma <[email protected]> Author: Prashant Sharma <[email protected]> Closes #17357 from ScrapCodes/driver-failover-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23af2d79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23af2d79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23af2d79 Branch: refs/heads/master Commit: 23af2d79ad9a3c83936485ee57513b39193a446b Parents: b8a08f2 Author: Prashant Sharma <[email protected]> Authored: Tue Oct 10 20:48:42 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Tue Oct 10 20:48:42 2017 +0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/deploy/Client.scala | 6 +++--- .../org/apache/spark/deploy/rest/StandaloneRestServer.scala | 4 +++- .../org/apache/spark/deploy/worker/DriverWrapper.scala | 9 ++++++--- 3 files changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/23af2d79/core/src/main/scala/org/apache/spark/deploy/Client.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index bf60932..7acb5c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -93,19 +93,19 @@ private class ClientEndpoint( driverArgs.cores, driverArgs.supervise, command) - ayncSendToMasterAndForwardReply[SubmitDriverResponse]( + asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) case "kill" => val driverId = driverArgs.driverId - ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) + asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } } /** * Send the message to master and forward the reply to self asynchronously. */ - private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { + private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { for (masterEndpoint <- masterEndpoints) { masterEndpoint.ask[T](message).onComplete { case Success(v) => self.send(v) http://git-wip-us.apache.org/repos/asf/spark/blob/23af2d79/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 0164084..22b65ab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -139,7 +139,9 @@ private[rest] class StandaloneSubmitRequestServlet( val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") val appArgs = request.appArgs - val environmentVariables = request.environmentVariables + // Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set on the remote system. + val environmentVariables = + request.environmentVariables.filterNot(x => x._1.matches("SPARK_LOCAL_(IP|HOSTNAME)")) // Construct driver description val conf = new SparkConf(false) http://git-wip-us.apache.org/repos/asf/spark/blob/23af2d79/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index c167119..b19c990 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit} +import org.apache.spark.internal.Logging import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -30,7 +31,7 @@ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, U * Utility object for launching driver programs such that they share fate with the Worker process. * This is used in standalone cluster mode only. */ -object DriverWrapper { +object DriverWrapper extends Logging { def main(args: Array[String]) { args.toList match { /* @@ -41,8 +42,10 @@ object DriverWrapper { */ case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() - val rpcEnv = RpcEnv.create("Driver", - Utils.localHostName(), 0, conf, new SecurityManager(conf)) + val host: String = Utils.localHostName() + val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt + val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf)) + logInfo(s"Driver address: ${rpcEnv.address}") rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl)) val currentLoader = Thread.currentThread.getContextClassLoader --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
