roadan closed pull request #4: dynamically resolving spark driver host URL: https://github.com/apache/incubator-amaterasu/pull/4
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala index 42faf71..eec0106 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala @@ -46,7 +46,8 @@ object ProvidersFactory { outStream: ByteArrayOutputStream, notifier: Notifier, executorId: String, - propFile:String = null): ProvidersFactory = { + hostName: String, + propFile: String = null): ProvidersFactory = { val result = new ProvidersFactory() val reflections = new Reflections(getClass.getClassLoader) @@ -56,7 +57,7 @@ object ProvidersFactory { val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider] - provider.init(data, jobId, outStream, notifier, executorId, propFile) + provider.init(data, jobId, outStream, notifier, executorId, propFile, hostName) notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created") (provider.getGroupIdentifier, provider) }).toMap diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index 7e56742..ce3b2ba 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -52,7 +52,8 @@ class SparkRunnersProvider extends RunnersProvider with Logging { outStream: ByteArrayOutputStream, notifier: Notifier, executorId: String, - propFile: String): Unit = { + propFile: String, + hostName: String): Unit = { val config = ClusterConfig(new FileInputStream(propFile)) shellLoger = ProcessLogger( @@ -76,7 +77,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { val sparkAppName = s"job_${jobId}_executor_$executorId" SparkRunnerHelper.notifier = notifier - val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, propFile) + val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, propFile, hostName) lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier, jars) sparkScalaRunner.initializeAmaContext(data.env) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala index 7152ff6..969eb0b 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala @@ -78,15 +78,18 @@ class ActionsExecutor extends Executor with Logging { this.executorDriver = driver val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData]) + // this is used to resolve the spark drier address + val hostName = slaveInfo.getHostname notifier = new MesosNotifier(driver) notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered") val outStream = new ByteArrayOutputStream() - providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue) + providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName) } override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = { + notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}") log.debug(s"launching task: $taskInfo") val status = TaskStatus.newBuilder diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala index 3ad2fda..05637cb 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala @@ -1,7 +1,7 @@ package org.apache.amaterasu.executor.yarn.executors import java.io.ByteArrayOutputStream -import java.net.URLDecoder +import java.net.{InetAddress, URLDecoder} import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper @@ -9,6 +9,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData} import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.executor.common.executors.ProvidersFactory +import org.apache.hadoop.net.NetUtils import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext @@ -58,7 +59,9 @@ object ActionsExecutorLauncher extends App with Logging { case _ => urlses(cl.getParent) } + val hostName = InetAddress.getLocalHost.getHostName + log.info(s"Hostname resolved to: $hostName") val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) @@ -89,6 +92,6 @@ object ActionsExecutorLauncher extends App with Logging { val notifier = new YarnNotifier(new YarnConfiguration()) log.info("Setup notifier") - actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, propFile = "./amaterasu.properties") + actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties") actionsExecutor.execute() } diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala index 537bde8..bad7c83 100644 --- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala @@ -108,7 +108,13 @@ object SparkRunnerHelper extends Logging { these ++ these.filter(_.isDirectory).flatMap(getAllFiles) } - def createSpark(env: Environment, sparkAppName: String, jars: Seq[String], sparkConf: Option[Map[String, Any]], executorEnv: Option[Map[String, Any]], propFile: String): SparkSession = { + def createSpark(env: Environment, + sparkAppName: String, + jars: Seq[String], + sparkConf: Option[Map[String, Any]], + executorEnv: Option[Map[String, Any]], + propFile: String, + hostName: String): SparkSession = { val config = if (propFile != null) { import java.io.FileInputStream @@ -124,7 +130,7 @@ object SparkRunnerHelper extends Logging { f.getName.endsWith(".zip")) conf.setAppName(sparkAppName) - .set("spark.driver.host", getNode) + .set("spark.driver.host", hostName) .set("spark.submit.deployMode", "client") .set("spark.hadoop.validateOutputSpecs", "false") .set("spark.logConf", "true") diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java index 35b9e69..01fe266 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java @@ -33,7 +33,8 @@ void init(ExecData data, ByteArrayOutputStream outStream, Notifier notifier, String executorId, - String propFile); + String propFile, + String hostName); String getGroupIdentifier(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services