roadan closed pull request #4: dynamically resolving spark driver host
URL: https://github.com/apache/incubator-amaterasu/pull/4
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
 
b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
index 42faf71..eec0106 100644
--- 
a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
+++ 
b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
@@ -46,7 +46,8 @@ object ProvidersFactory {
             outStream: ByteArrayOutputStream,
             notifier: Notifier,
             executorId: String,
-            propFile:String = null): ProvidersFactory = {
+            hostName: String,
+            propFile: String = null): ProvidersFactory = {
 
     val result = new ProvidersFactory()
     val reflections = new Reflections(getClass.getClassLoader)
@@ -56,7 +57,7 @@ object ProvidersFactory {
 
       val provider = 
Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider]
 
-      provider.init(data, jobId, outStream, notifier, executorId, propFile)
+      provider.init(data, jobId, outStream, notifier, executorId, propFile, 
hostName)
       notifier.info(s"a provider for group ${provider.getGroupIdentifier} was 
created")
       (provider.getGroupIdentifier, provider)
     }).toMap
diff --git 
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
 
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index 7e56742..ce3b2ba 100644
--- 
a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ 
b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -52,7 +52,8 @@ class SparkRunnersProvider extends RunnersProvider with 
Logging {
                     outStream: ByteArrayOutputStream,
                     notifier: Notifier,
                     executorId: String,
-                    propFile: String): Unit = {
+                    propFile: String,
+                    hostName: String): Unit = {
 
     val config = ClusterConfig(new FileInputStream(propFile))
     shellLoger = ProcessLogger(
@@ -76,7 +77,7 @@ class SparkRunnersProvider extends RunnersProvider with 
Logging {
     val sparkAppName = s"job_${jobId}_executor_$executorId"
 
     SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, 
conf, executorEnv, propFile)
+    val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, 
conf, executorEnv, propFile, hostName)
 
     lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, 
outStream, notifier, jars)
     sparkScalaRunner.initializeAmaContext(data.env)
diff --git 
a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
 
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
index 7152ff6..969eb0b 100755
--- 
a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
+++ 
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala
@@ -78,15 +78,18 @@ class ActionsExecutor extends Executor with Logging {
     this.executorDriver = driver
     val data = mapper.readValue(new 
ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData])
 
+    // this is used to resolve the spark drier address
+    val hostName = slaveInfo.getHostname
     notifier = new MesosNotifier(driver)
     notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} 
registered")
     val outStream = new ByteArrayOutputStream()
-    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, 
executorInfo.getExecutorId.getValue)
+    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, 
executorInfo.getExecutorId.getValue, hostName)
 
   }
 
   override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = {
 
+
     notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}")
     log.debug(s"launching task: $taskInfo")
     val status = TaskStatus.newBuilder
diff --git 
a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
 
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index 3ad2fda..05637cb 100644
--- 
a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ 
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -1,7 +1,7 @@
 package org.apache.amaterasu.executor.yarn.executors
 
 import java.io.ByteArrayOutputStream
-import java.net.URLDecoder
+import java.net.{InetAddress, URLDecoder}
 
 import scala.collection.JavaConverters._
 import com.fasterxml.jackson.databind.ObjectMapper
@@ -9,6 +9,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.spark.SparkContext
 
@@ -58,7 +59,9 @@ object ActionsExecutorLauncher extends App with Logging {
     case _ => urlses(cl.getParent)
   }
 
+  val hostName = InetAddress.getLocalHost.getHostName
 
+  log.info(s"Hostname resolved to: $hostName")
   val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
 
@@ -89,6 +92,6 @@ object ActionsExecutorLauncher extends App with Logging {
   val notifier = new YarnNotifier(new YarnConfiguration())
 
   log.info("Setup notifier")
-  actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, 
notifier, taskIdAndContainerId, propFile = "./amaterasu.properties")
+  actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, 
notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
   actionsExecutor.execute()
 }
diff --git 
a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
 
b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index 537bde8..bad7c83 100644
--- 
a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ 
b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -108,7 +108,13 @@ object SparkRunnerHelper extends Logging {
     these ++ these.filter(_.isDirectory).flatMap(getAllFiles)
   }
 
-  def createSpark(env: Environment, sparkAppName: String, jars: Seq[String], 
sparkConf: Option[Map[String, Any]], executorEnv: Option[Map[String, Any]], 
propFile: String): SparkSession = {
+  def createSpark(env: Environment,
+                  sparkAppName: String,
+                  jars: Seq[String],
+                  sparkConf: Option[Map[String, Any]],
+                  executorEnv: Option[Map[String, Any]],
+                  propFile: String,
+                  hostName: String): SparkSession = {
 
     val config = if (propFile != null) {
       import java.io.FileInputStream
@@ -124,7 +130,7 @@ object SparkRunnerHelper extends Logging {
       f.getName.endsWith(".zip"))
 
     conf.setAppName(sparkAppName)
-      .set("spark.driver.host", getNode)
+      .set("spark.driver.host", hostName)
       .set("spark.submit.deployMode", "client")
       .set("spark.hadoop.validateOutputSpecs", "false")
       .set("spark.logConf", "true")
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java 
b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
index 35b9e69..01fe266 100644
--- a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
+++ b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java
@@ -33,7 +33,8 @@ void init(ExecData data,
               ByteArrayOutputStream outStream,
               Notifier notifier,
               String executorId,
-              String propFile);
+              String propFile,
+              String hostName);
 
     String getGroupIdentifier();
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to