Repository: spark
Updated Branches:
  refs/heads/master 1a3966472 -> d6fb485de


[SPARK-14423][YARN] Avoid same name files added to distributed cache again

## What changes were proposed in this pull request?

In the current implementation of assembly-free spark deployment, jars under 
`assembly/target/scala-xxx/jars` will be uploaded to distributed cache by 
default, there's a chance these jars' name will be conflicted with name of jars 
specified in `--jars`, this will introduce exception when starting application:

```
client token: N/A
         diagnostics: Application application_1459907402325_0004 failed 2 times 
due to AM Container for appattempt_1459907402325_0004_000002 exited with  
exitCode: -1000
For more detailed output, check application tracking 
page:http://hw12100.local:8088/proxy/application_1459907402325_0004/Then, click 
on links to logs of each attempt.
Diagnostics: Resource 
hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar
 changed on src filesystem (expected 1459909780508, was 1459909782590
java.io.IOException: Resource 
hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar
 changed on src filesystem (expected 1459909780508, was 1459909782590
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

So here by checking the name of file to avoid same name files uploaded again.

## How was this patch tested?

Unit test and manual integrated test is done locally.

Author: jerryshao <[email protected]>

Closes #12203 from jerryshao/SPARK-14423.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6fb485d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6fb485d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6fb485d

Branch: refs/heads/master
Commit: d6fb485de8b79054db08658d904a3148a04d4180
Parents: 1a39664
Author: jerryshao <[email protected]>
Authored: Mon Apr 18 10:13:38 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Apr 18 10:13:38 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 14 +++++++--
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 32 +++++++++++++++++++-
 2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6fb485d/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 04e91f8..7c168ed 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -364,6 +364,10 @@ private[spark] class Client(
     // multiple times, YARN will fail to launch containers for the app with an 
internal
     // error.
     val distributedUris = new HashSet[String]
+    // Used to keep track of URIs(files) added to the distribute cache have 
the same name. If
+    // same name but different path files are added multiple time, YARN will 
fail to launch
+    // containers for the app with an internal error.
+    val distributedNames = new HashSet[String]
     YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, 
credentials)
     YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, 
credentials)
 
@@ -376,11 +380,16 @@ private[spark] class Client(
 
     def addDistributedUri(uri: URI): Boolean = {
       val uriStr = uri.toString()
+      val fileName = new File(uri.getPath).getName
       if (distributedUris.contains(uriStr)) {
-        logWarning(s"Resource $uri added multiple times to distributed cache.")
+        logWarning(s"Same path resource $uri added multiple times to 
distributed cache.")
+        false
+      } else if (distributedNames.contains(fileName)) {
+        logWarning(s"Same name resource $uri added multiple times to 
distributed cache")
         false
       } else {
         distributedUris += uriStr
+        distributedNames += fileName
         true
       }
     }
@@ -519,8 +528,7 @@ private[spark] class Client(
     ).foreach { case (flist, resType, addToClasspath) =>
       flist.foreach { file =>
         val (_, localizedPath) = distribute(file, resType = resType)
-        require(localizedPath != null)
-        if (addToClasspath) {
+        if (addToClasspath && localizedPath != null) {
           cachedSecondaryJarLinks += localizedPath
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6fb485d/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 74e268d..23050e8 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{File, FileOutputStream}
+import java.io.{File, FileInputStream, FileOutputStream}
 import java.net.URI
 import java.util.Properties
 
@@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
     classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
   }
 
+  test("ignore same name jars") {
+    val libs = Utils.createTempDir()
+    val jarsDir = new File(libs, "jars")
+    assert(jarsDir.mkdir())
+    new FileOutputStream(new File(libs, "RELEASE")).close()
+    val userLibs = Utils.createTempDir()
+
+    val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
+    val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
+    // Copy jar2 to jar3 with same name
+    val jar3 = {
+      val target = new File(userLibs, new File(jar1.toURI).getName)
+      val input = new FileInputStream(jar2.getPath)
+      val output = new FileOutputStream(target)
+      Utils.copyStream(input, output, closeStreams = true)
+      target.toURI.toURL
+    }
+
+    val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> 
libs.getAbsolutePath))
+      .set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath))
+
+    val client = createClient(sparkConf)
+    val tempDir = Utils.createTempDir()
+    client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+
+    // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name 
with jar1 will be
+    // ignored.
+    sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new 
File(jar2.toURI).getName)))
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to