Repository: spark
Updated Branches:
  refs/heads/master 208fff3ac -> 3b3cc7600


[SPARK-14062][YARN] Fix log4j and upload metrics.properties automatically with 
distributed cache

## What changes were proposed in this pull request?

1. Currently log4j which uses distributed cache only adds to AM's classpath, 
not executor's, this is introduced in #9118, which breaks the original meaning 
of that PR, so here add log4j file to the classpath of both AM and executors.
2. Automatically upload metrics.properties to distributed cache, so that it 
could be used by remote driver and executors implicitly.

## How was this patch tested?

Unit test and integration test is done.

Author: jerryshao <[email protected]>

Closes #11885 from jerryshao/SPARK-14062.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b3cc760
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b3cc760
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b3cc760

Branch: refs/heads/master
Commit: 3b3cc76004438a942ecea752db39f3a904a52462
Parents: 208fff3
Author: jerryshao <[email protected]>
Authored: Thu Mar 31 10:27:33 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Thu Mar 31 10:27:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 71 ++++++++------------
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  3 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  7 +-
 3 files changed, 31 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b3cc760/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7b29c1a..f0f13a1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -351,14 +351,6 @@ private[spark] class Client(
 
     val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
 
-    val oldLog4jConf = Option(System.getenv("SPARK_LOG4J_CONF"))
-    if (oldLog4jConf.isDefined) {
-      logWarning(
-        "SPARK_LOG4J_CONF detected in the system environment. This variable 
has been " +
-          "deprecated. Please refer to the \"Launching Spark on YARN\" 
documentation " +
-          "for alternatives.")
-    }
-
     def addDistributedUri(uri: URI): Boolean = {
       val uriStr = uri.toString()
       if (distributedUris.contains(uriStr)) {
@@ -479,25 +471,16 @@ private[spark] class Client(
     }
 
     /**
-     * Copy a few resources to the distributed cache if their scheme is not 
"local".
+     * Copy user jar to the distributed cache if their scheme is not "local".
      * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
-     * Each resource is represented by a 3-tuple of:
-     *   (1) destination resource name,
-     *   (2) local path to the resource,
-     *   (3) Spark property key to set if the scheme is not local
      */
-    List(
-      (APP_JAR_NAME, args.userJar, APP_JAR),
-      ("log4j.properties", oldLog4jConf.orNull, null)
-    ).foreach { case (destName, path, confKey) =>
-      if (path != null && !path.trim().isEmpty()) {
-        val (isLocal, localizedPath) = distribute(path, destName = 
Some(destName))
-        if (isLocal && confKey != null) {
-          require(localizedPath != null, s"Path $path already distributed.")
-          // If the resource is intended for local use only, handle this 
downstream
-          // by setting the appropriate property
-          sparkConf.set(confKey, localizedPath)
-        }
+    Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
+      val (isLocal, localizedPath) = distribute(jar, destName = 
Some(APP_JAR_NAME))
+      if (isLocal) {
+        require(localizedPath != null, s"Path $jar already distributed")
+        // If the resource is intended for local use only, handle this 
downstream
+        // by setting the appropriate property
+        sparkConf.set(APP_JAR, localizedPath)
       }
     }
 
@@ -541,11 +524,10 @@ private[spark] class Client(
       distribute(f, targetDir = targetDir)
     }
 
-    // Distribute an archive with Hadoop and Spark configuration for the AM.
+    // Distribute an archive with Hadoop and Spark configuration for the AM 
and executors.
     val (_, confLocalizedPath) = 
distribute(createConfArchive().toURI().getPath(),
       resType = LocalResourceType.ARCHIVE,
-      destName = Some(LOCALIZED_CONF_DIR),
-      appMasterOnly = true)
+      destName = Some(LOCALIZED_CONF_DIR))
     require(confLocalizedPath != null)
 
     localResources
@@ -554,10 +536,10 @@ private[spark] class Client(
   /**
    * Create an archive with the config files for distribution.
    *
-   * These are only used by the AM, since executors will use the configuration 
object broadcast by
-   * the driver. The files are zipped and added to the job as an archive, so 
that YARN will explode
-   * it when distributing to the AM. This directory is then added to the 
classpath of the AM
-   * process, just to make sure that everybody is using the same default 
config.
+   * These will be used by AM and executors. The files are zipped and added to 
the job as an
+   * archive, so that YARN will explode it when distributing to AM and 
executors. This directory
+   * is then added to the classpath of AM and executor process, just to make 
sure that everybody
+   * is using the same default config.
    *
    * This follows the order of precedence set by the startup scripts, in which 
HADOOP_CONF_DIR
    * shows up in the classpath before YARN_CONF_DIR.
@@ -576,11 +558,14 @@ private[spark] class Client(
     // required when user changes log4j.properties directly to set the log 
configurations. If
     // configuration file is provided through --files then executors will be 
taking configurations
     // from --files instead of $SPARK_CONF_DIR/log4j.properties.
-    val log4jFileName = "log4j.properties"
-    
Option(Utils.getContextOrSparkClassLoader.getResource(log4jFileName)).foreach { 
url =>
-      if (url.getProtocol == "file") {
-        hadoopConfFiles(log4jFileName) = new File(url.getPath)
-      }
+
+    // Also uploading metrics.properties to distributed cache if exists in 
classpath.
+    // If user specify this file using --files then executors will use the one
+    // from --files instead.
+    for { prop <- Seq("log4j.properties", "metrics.properties")
+          url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
+          if url.getProtocol == "file" } {
+      hadoopConfFiles(prop) = new File(url.getPath)
     }
 
     Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
@@ -659,7 +644,7 @@ private[spark] class Client(
       pySparkArchives: Seq[String]): HashMap[String, String] = {
     logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
-    populateClasspath(args, yarnConf, sparkConf, env, true, 
sparkConf.get(DRIVER_CLASS_PATH))
+    populateClasspath(args, yarnConf, sparkConf, env, 
sparkConf.get(DRIVER_CLASS_PATH))
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
@@ -1236,18 +1221,16 @@ object Client extends Logging {
       conf: Configuration,
       sparkConf: SparkConf,
       env: HashMap[String, String],
-      isAM: Boolean,
       extraClassPath: Option[String] = None): Unit = {
     extraClassPath.foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
     }
+
     addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), 
env)
 
-    if (isAM) {
-      addClasspathEntry(
-        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + 
Path.SEPARATOR +
-          LOCALIZED_CONF_DIR, env)
-    }
+    addClasspathEntry(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+        LOCALIZED_CONF_DIR, env)
 
     if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
       // in order to properly add the app jar when user classpath is first

http://git-wip-us.apache.org/repos/asf/spark/blob/3b3cc760/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index f956a4d..7b55d78 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -289,8 +289,7 @@ private[yarn] class ExecutorRunnable(
 
   private def prepareEnvironment(container: Container): HashMap[String, 
String] = {
     val env = new HashMap[String, String]()
-    Client.populateClasspath(null, yarnConf, sparkConf, env, false,
-      sparkConf.get(EXECUTOR_CLASS_PATH))
+    Client.populateClasspath(null, yarnConf, sparkConf, env, 
sparkConf.get(EXECUTOR_CLASS_PATH))
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
       // This assumes each executor environment variable set here is a path

http://git-wip-us.apache.org/repos/asf/spark/blob/3b3cc760/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index e3613a9..64723c3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -121,7 +121,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), 
sparkConf)
 
-    populateClasspath(args, conf, sparkConf, env, true)
+    populateClasspath(args, conf, sparkConf, env)
 
     val cp = env("CLASSPATH").split(":|;|<CPS>")
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -178,8 +178,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
       "/remotePath/1:/remotePath/2")
 
     val env = new MutableHashMap[String, String]()
-    populateClasspath(null, conf, sparkConf, env, false,
-      extraClassPath = Some("/localPath/my1.jar"))
+    populateClasspath(null, conf, sparkConf, env, extraClassPath = 
Some("/localPath/my1.jar"))
     val cp = classpath(env)
     cp should contain ("/remotePath/spark.jar")
     cp should contain ("/remotePath/my1.jar")
@@ -356,7 +355,7 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
 
   private def classpath(client: Client): Array[String] = {
     val env = new MutableHashMap[String, String]()
-    populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+    populateClasspath(null, client.hadoopConf, client.sparkConf, env)
     classpath(env)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to