eyalbenivri closed pull request #5: Amaterasu 17 URL: https://github.com/apache/incubator-amaterasu/pull/5
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala index efc00fc..653c285 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala @@ -68,7 +68,7 @@ class ClusterConfig extends Logging { class Master { var cores: Int = 1 - var memoryMB: Int = 512 + var memoryMB: Int = 1024 def load(props: Properties): Unit = { if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int] @@ -128,7 +128,7 @@ class ClusterConfig extends Logging { object Jobs { var cpus: Double = 1 - var mem: Long = 512 + var mem: Long = 1024 var repoSize: Long = 1024 def load(props: Properties): Unit = { diff --git a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala new file mode 100644 index 0000000..8c000cc --- /dev/null +++ b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala @@ -0,0 +1,12 @@ +package org.apache.amaterasu.common.utils + +import java.io.File + +object FileUtils { + + def getAllFiles(dir: File): Array[File] = { + val these = dir.listFiles + these ++ these.filter(_.isDirectory).flatMap(getAllFiles) + } + +} diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala index 42faf71..eec0106 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala @@ -46,7 +46,8 @@ object ProvidersFactory { outStream: ByteArrayOutputStream, notifier: Notifier, executorId: String, - propFile:String = null): ProvidersFactory = { + hostName: String, + propFile: String = null): ProvidersFactory = { val result = new ProvidersFactory() val reflections = new Reflections(getClass.getClassLoader) @@ -56,7 +57,7 @@ object ProvidersFactory { val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider] - provider.init(data, jobId, outStream, notifier, executorId, propFile) + provider.init(data, jobId, outStream, notifier, executorId, propFile, hostName) notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created") (provider.getGroupIdentifier, provider) }).toMap diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index 7e56742..ce3b2ba 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -52,7 +52,8 @@ class SparkRunnersProvider extends RunnersProvider with Logging { outStream: ByteArrayOutputStream, notifier: Notifier, executorId: String, - propFile: String): Unit = { + propFile: String, + hostName: String): Unit = { val config = ClusterConfig(new FileInputStream(propFile)) shellLoger = ProcessLogger( @@ -76,7 +77,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { val sparkAppName = s"job_${jobId}_executor_$executorId" SparkRunnerHelper.notifier = notifier - val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, propFile) + val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, propFile, hostName) lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier, jars) sparkScalaRunner.initializeAmaContext(data.env) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala index 7152ff6..969eb0b 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala @@ -78,15 +78,18 @@ class ActionsExecutor extends Executor with Logging { this.executorDriver = driver val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData]) + // this is used to resolve the spark drier address + val hostName = slaveInfo.getHostname notifier = new MesosNotifier(driver) notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered") val outStream = new ByteArrayOutputStream() - providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue) + providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName) } override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = { + notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}") log.debug(s"launching task: $taskInfo") val status = TaskStatus.newBuilder diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala index 3ad2fda..05637cb 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala @@ -1,7 +1,7 @@ package org.apache.amaterasu.executor.yarn.executors import java.io.ByteArrayOutputStream -import java.net.URLDecoder +import java.net.{InetAddress, URLDecoder} import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.ObjectMapper @@ -9,6 +9,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData} import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.executor.common.executors.ProvidersFactory +import org.apache.hadoop.net.NetUtils import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.SparkContext @@ -58,7 +59,9 @@ object ActionsExecutorLauncher extends App with Logging { case _ => urlses(cl.getParent) } + val hostName = InetAddress.getLocalHost.getHostName + log.info(s"Hostname resolved to: $hostName") val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) @@ -89,6 +92,6 @@ object ActionsExecutorLauncher extends App with Logging { val notifier = new YarnNotifier(new YarnConfiguration()) log.info("Setup notifier") - actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, propFile = "./amaterasu.properties") + actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties") actionsExecutor.execute() } diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala index 537bde8..ba6a3e1 100644 --- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala @@ -22,6 +22,7 @@ import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment +import org.apache.amaterasu.common.utils.FileUtils import org.apache.spark.repl.amaterasu.AmaSparkILoop import org.apache.spark.sql.SparkSession import org.apache.spark.util.Utils @@ -103,12 +104,14 @@ object SparkRunnerHelper extends Logging { interpreter = result } - def getAllFiles(dir: File): Array[File] = { - val these = dir.listFiles - these ++ these.filter(_.isDirectory).flatMap(getAllFiles) - } - def createSpark(env: Environment, sparkAppName: String, jars: Seq[String], sparkConf: Option[Map[String, Any]], executorEnv: Option[Map[String, Any]], propFile: String): SparkSession = { + def createSpark(env: Environment, + sparkAppName: String, + jars: Seq[String], + sparkConf: Option[Map[String, Any]], + executorEnv: Option[Map[String, Any]], + propFile: String, + hostName: String): SparkSession = { val config = if (propFile != null) { import java.io.FileInputStream @@ -119,12 +122,12 @@ object SparkRunnerHelper extends Logging { Thread.currentThread().setContextClassLoader(getClass.getClassLoader) - val pyfiles = getAllFiles(new File("miniconda/pkgs")).filter(f => f.getName.endsWith(".py") || + val pyfiles = FileUtils.getAllFiles(new File("miniconda/pkgs")).filter(f => f.getName.endsWith(".py") || f.getName.endsWith(".egg") || f.getName.endsWith(".zip")) conf.setAppName(sparkAppName) - .set("spark.driver.host", getNode) + .set("spark.driver.host", hostName) .set("spark.submit.deployMode", "client") .set("spark.hadoop.validateOutputSpecs", "false") .set("spark.logConf", "true") @@ -149,7 +152,7 @@ object SparkRunnerHelper extends Logging { .set("spark.master", "yarn") .set("spark.executor.instances", "1") // TODO: change this - .set("spark.yarn.jars", s"${config.spark.home}/jars/*") + .set("spark.yarn.jars", s"spark/jars/*") .set("spark.executor.memory", "1g") .set("spark.dynamicAllocation.enabled", "false") //.set("spark.shuffle.service.enabled", "true") diff --git a/leader/build.gradle b/leader/build.gradle index 2bbeb75..9ecd17c 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -21,8 +21,8 @@ plugins { id 'java' } -sourceCompatibility = 1.7 -targetCompatibility = 1.7 +sourceCompatibility = 1.8 +targetCompatibility = 1.8 repositories { maven { @@ -33,8 +33,11 @@ repositories { dependencies { compile 'org.scala-lang:scala-library:2.11.8' + compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' +// compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' compile project(':common') + compile project(':amaterasu-sdk') compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0' compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0' @@ -57,6 +60,7 @@ dependencies { compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2' compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6' compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2' + compile group: 'org.reflections', name: 'reflections', version: '0.9.11' testCompile project(':common') testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java index bcf890d..e085d6e 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java @@ -18,6 +18,8 @@ import org.apache.amaterasu.common.configuration.ClusterConfig; +import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory; +import org.apache.amaterasu.sdk.FrameworkSetupProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -108,7 +110,7 @@ public void run(JobOpts opts, String[] args) throws Exception { List<String> commands = Collections.singletonList( "env AMA_NODE=" + System.getenv("AMA_NODE") + " " + - "$JAVA_HOME/bin/java" + + "$JAVA_HOME/bin/java" + " -Dscala.usejavacp=false" + " -Xmx1G" + " org.apache.amaterasu.leader.yarn.ApplicationMaster " + @@ -127,16 +129,33 @@ public void run(JobOpts opts, String[] args) throws Exception { try { if (!fs.exists(jarPathQualified)) { File home = new File(opts.home); + fs.mkdirs(jarPathQualified); for (File f : home.listFiles()) { - fs.mkdirs(jarPathQualified); fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified); } + + // setup frameworks + FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(config); + for (String group : frameworkFactory.groups()) { + FrameworkSetupProvider framework = frameworkFactory.getFramework(group); + + //creating a group folder + Path frameworkPath = Path.mergePaths(jarPathQualified, new Path("/" + framework.getGroupIdentifier())); + System.out.println("===> " + frameworkPath.toString()); + + fs.mkdirs(frameworkPath); + for (File file : framework.getGroupResources()) { + if (file.exists()) + fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), frameworkPath); + } + } } } catch (IOException e) { LOGGER.error("Error uploading ama folder to HDFS.", e); System.exit(3); } + // get version of build String version = config.version(); @@ -228,6 +247,10 @@ public void run(JobOpts opts, String[] args) throws Exception { LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime()); } + private static void copyDirRecursive(){ + + } + private boolean isAppFinished(YarnApplicationState appState) { return appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED || diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala index b9637ce..8ef1c7a 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala @@ -27,6 +27,7 @@ import org.apache.amaterasu.leader.execution.JobManager import org.apache.curator.framework.CuratorFramework import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.io.Source /** @@ -103,6 +104,12 @@ object JobParser { attempts ) + //updating the list of frameworks setup + manager.frameworks.getOrElseUpdate(action.data.groupId, + new mutable.HashSet[String]()) + .add(action.data.typeId) + + if (manager.head == null) manager.head = action @@ -125,6 +132,11 @@ object JobParser { action.data.errorActionId = errorAction.data.id manager.registerAction(errorAction) + + //updating the list of frameworks setup + manager.frameworks.getOrElseUpdate(errorAction.data.groupId, + new mutable.HashSet[String]()) + .add(errorAction.data.typeId) } parseActions(actions.tail, manager, actionsQueue, attempts, action) @@ -150,7 +162,6 @@ object JobParser { client, attempts ) - } def parseErrorAction( diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala index adb5e8f..38f4b7c 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala @@ -25,6 +25,7 @@ import org.apache.amaterasu.leader.execution.actions.Action import org.apache.curator.framework.CuratorFramework import scala.collection.concurrent.TrieMap +import scala.collection.mutable /** * The JobManager manages the lifecycle of a job. It queues new actions for execution, @@ -33,16 +34,15 @@ import scala.collection.concurrent.TrieMap */ class JobManager extends Logging { - var name: String = null - var jobId: String = null - var client: CuratorFramework = null - var head: Action = null - - val jobReport = new StringBuilder + var name: String = _ + var jobId: String = _ + var client: CuratorFramework = _ + var head: Action = _ // TODO: this is not private due to tests, fix this!!! val registeredActions = new TrieMap[String, Action] - private var executionQueue: BlockingQueue[ActionData] = null + val frameworks = new TrieMap[String, mutable.HashSet[String]] + private var executionQueue: BlockingQueue[ActionData] = _ /** * The start method initiates the job execution by executing the first action. @@ -50,15 +50,6 @@ class JobManager extends Logging { */ def start(): Unit = { - jobReport.append( - s""" - | ****************************************************************** - | * started job with id: $jobId * - | ****************************************************************** - | * * - """.stripMargin - ) - jobReport.append("\n") head.execute() } @@ -77,7 +68,7 @@ class JobManager extends Logging { val nextAction: ActionData = executionQueue.poll() if (nextAction != null) { - registeredActions.get(nextAction.id).get.announceStart + registeredActions(nextAction.id).announceStart } nextAction @@ -85,10 +76,9 @@ class JobManager extends Logging { def reQueueAction(actionId: String): Unit = { - jobReport.append(s" *+-> action: $actionId re-queued for execution *\n") - val action = registeredActions.get(actionId).get + val action = registeredActions(actionId) executionQueue.put(action.data) - registeredActions.get(actionId).get.announceQueued + registeredActions(actionId).announceQueued } @@ -110,7 +100,6 @@ class JobManager extends Logging { */ def actionComplete(actionId: String): Unit = { - jobReport.append(s" *+-> action: $actionId completed *\n") val action = registeredActions.get(actionId).get action.announceComplete action.data.nextActionIds.foreach(id => @@ -130,12 +119,6 @@ class JobManager extends Logging { def actionFailed(actionId: String, message: String): Unit = { log.warn(message) - jobReport.append( - s""" *+-> action: $actionId failed with message: * - | $message - """.stripMargin - ) - jobReport.append("\n") val action = registeredActions.get(actionId).get val id = action.handleFailure(message) @@ -160,7 +143,6 @@ class JobManager extends Logging { */ def actionStarted(actionId: String): Unit = { - jobReport.append(s" *+-> action: $actionId started *\n") val action = registeredActions.get(actionId).get action.announceStart diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala new file mode 100644 index 0000000..67d07a8 --- /dev/null +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala @@ -0,0 +1,45 @@ +package org.apache.amaterasu.leader.execution.frameworks + +import java.net.{URL, URLClassLoader} + +import org.apache.amaterasu.common.configuration.ClusterConfig +import org.apache.amaterasu.common.logging.Logging +import org.apache.amaterasu.sdk.FrameworkSetupProvider +import org.reflections.Reflections + +import scala.collection.JavaConversions._ + +class FrameworkProvidersFactory { + var providers: Map[String, FrameworkSetupProvider] = _ + + def groups: Array[String] = { + providers.keys.toArray + } + + def getFramework(groupId: String): FrameworkSetupProvider = { + providers(groupId) + } +} + +object FrameworkProvidersFactory extends Logging { + + def apply(config: ClusterConfig): FrameworkProvidersFactory = { + + val result = new FrameworkProvidersFactory() + + val reflections = new Reflections(getClass.getClassLoader) + val runnerTypes = reflections.getSubTypesOf(classOf[FrameworkSetupProvider]).toSet + + result.providers = runnerTypes.map(r => { + + val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[FrameworkSetupProvider] + + provider.init(config) + log.info(s"a provider for group ${provider.getGroupIdentifier} was created") + (provider.getGroupIdentifier, provider) + + }).toMap + + result + } +} \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala new file mode 100644 index 0000000..8515102 --- /dev/null +++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala @@ -0,0 +1,35 @@ +package org.apache.amaterasu.leader.frameworks.spark + +import java.io.File + +import org.apache.amaterasu.common.configuration.ClusterConfig +import org.apache.amaterasu.common.utils.FileUtils +import org.apache.amaterasu.sdk.FrameworkSetupProvider + +import scala.collection.mutable + +class SparkSetupProvider extends FrameworkSetupProvider { + + private var conf: ClusterConfig = _ + private val runnersResources = mutable.Map[String,Array[File]]() + + override def init(conf: ClusterConfig): Unit = { + this.conf = conf + + runnersResources += "scala" -> Array.empty[File] + runnersResources += "sql" -> Array.empty[File] + //TODO: Nadav needs to setup conda here + runnersResources += "python" -> Array.empty[File] + } + + override def getGroupIdentifier: String = "spark" + + override def getGroupResources: Array[File] = { + new File(conf.spark.home).listFiles + } + + override def getRunnerResources(runnerId: String): Array[File] = { + runnersResources(runnerId) + } + +} \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index 16bcec2..9844d07 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -26,11 +26,12 @@ import com.google.gson.Gson import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.logging.Logging +import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory import org.apache.amaterasu.leader.execution.{JobLoader, JobManager} import org.apache.amaterasu.leader.utilities.{Args, DataLoader} import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants @@ -44,7 +45,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.collection.concurrent +import scala.collection.{concurrent, mutable} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} @@ -151,7 +152,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { // Resource requirements for worker containers // TODO: this should be per task based on the framework config this.capability = Records.newRecord(classOf[Resource]) - this.capability.setMemory(Math.min(config.taskMem, 512)) + this.capability.setMemory(Math.min(config.taskMem, 1024)) this.capability.setVirtualCores(1) while (!jobManager.outOfActions) { @@ -210,9 +211,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) val commands: List[String] = List( "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ", - s"/bin/bash ${config.spark.home}/bin/load-spark-env.sh && ", - s"java -cp ${config.spark.home}/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " + - "-Xmx512M " + + s"/bin/bash spark/bin/load-spark-env.sh && ", + s"java -cp spark/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " + + "-Xmx1G " + "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + @@ -226,15 +227,25 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { ctx.setCommands(commands) ctx.setTokens(allTokens) - ctx.setLocalResources(Map[String, LocalResource]( + + var resources = mutable.Map[String, LocalResource]( "executor.jar" -> executorJar, "amaterasu.properties" -> propFile, + // TODO: Nadav/Eyal all of these should move to the executor resource setup "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/Miniconda2-latest-Linux-x86_64.sh"))), "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/codegen.py"))), "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/runtime.py"))), "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark-version-info.properties"))), - "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py"))) - )) + "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py")))) + + val frameworkFactory = FrameworkProvidersFactory(config) + val framework = frameworkFactory.getFramework(actionData.groupId) + + //adding the framework and executor resources + setupResources(framework.getGroupIdentifier, resources, framework.getGroupIdentifier) + setupResources(s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}") + + ctx.setLocalResources(resources) ctx.setEnvironment(Map[String, String]( "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/", @@ -261,6 +272,22 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { } } + private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = { + + val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath")) + + if (fs.exists(sourcePath)) { + + val files = fs.listFiles(sourcePath, true) + + while (files.hasNext) { + val res = files.next() + val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "") + countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath)) + } + } + } + def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = { import java.io.IOException diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh index 8c0abce..153eba8 100755 --- a/leader/src/main/scripts/ama-start-yarn.sh +++ b/leader/src/main/scripts/ama-start-yarn.sh @@ -119,9 +119,9 @@ if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then fi eval "hdfs dfs -rm -R -skipTrash /apps/amaterasu" -eval "hdfs dfs -mkdir /apps/amaterasu/" -eval "hdfs dfs -chmod -R 777 /apps/amaterasu/" -eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/" +#eval "hdfs dfs -mkdir /apps/amaterasu/" +#eval "hdfs dfs -chmod -R 777 /apps/amaterasu/" +#eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/" eval $CMD | grep "===>" kill $SERVER_PID diff --git a/leader/src/main/scripts/log4j.properties b/leader/src/main/scripts/log4j.properties index a9d592f..6b40036 100644 --- a/leader/src/main/scripts/log4j.properties +++ b/leader/src/main/scripts/log4j.properties @@ -5,4 +5,6 @@ log4j.rootLogger=DEBUG, stdout, file log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +log4j.logger.org.reflections=OFF \ No newline at end of file diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java new file mode 100644 index 0000000..dc31e4f --- /dev/null +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java @@ -0,0 +1,17 @@ +package org.apache.amaterasu.sdk; + +import org.apache.amaterasu.common.configuration.ClusterConfig; + +import java.io.File; + +public interface FrameworkSetupProvider { + + void init(ClusterConfig conf); + + String getGroupIdentifier(); + + File[] getGroupResources(); + + File[] getRunnerResources(String runnerId); + +} \ No newline at end of file diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java index 35b9e69..01fe266 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java @@ -33,7 +33,8 @@ void init(ExecData data, ByteArrayOutputStream outStream, Notifier notifier, String executorId, - String propFile); + String propFile, + String hostName); String getGroupIdentifier(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services