Repository: spark
Updated Branches:
refs/heads/master 1a3966472 -> d6fb485de
[SPARK-14423][YARN] Avoid same name files added to distributed cache again
## What changes were proposed in this pull request?
In the current implementation of assembly-free spark deployment, jars under
`assembly/target/scala-xxx/jars` will be uploaded to distributed cache by
default, there's a chance these jars' name will be conflicted with name of jars
specified in `--jars`, this will introduce exception when starting application:
```
client token: N/A
diagnostics: Application application_1459907402325_0004 failed 2 times
due to AM Container for appattempt_1459907402325_0004_000002 exited with
exitCode: -1000
For more detailed output, check application tracking
page:http://hw12100.local:8088/proxy/application_1459907402325_0004/Then, click
on links to logs of each attempt.
Diagnostics: Resource
hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar
changed on src filesystem (expected 1459909780508, was 1459909782590
java.io.IOException: Resource
hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar
changed on src filesystem (expected 1459909780508, was 1459909782590
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
So here by checking the name of file to avoid same name files uploaded again.
## How was this patch tested?
Unit test and manual integrated test is done locally.
Author: jerryshao <[email protected]>
Closes #12203 from jerryshao/SPARK-14423.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6fb485d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6fb485d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6fb485d
Branch: refs/heads/master
Commit: d6fb485de8b79054db08658d904a3148a04d4180
Parents: 1a39664
Author: jerryshao <[email protected]>
Authored: Mon Apr 18 10:13:38 2016 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Apr 18 10:13:38 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/deploy/yarn/Client.scala | 14 +++++++--
.../apache/spark/deploy/yarn/ClientSuite.scala | 32 +++++++++++++++++++-
2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d6fb485d/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 04e91f8..7c168ed 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -364,6 +364,10 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an
internal
// error.
val distributedUris = new HashSet[String]
+ // Used to keep track of URIs(files) added to the distribute cache have
the same name. If
+ // same name but different path files are added multiple time, YARN will
fail to launch
+ // containers for the app with an internal error.
+ val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf,
credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf,
credentials)
@@ -376,11 +380,16 @@ private[spark] class Client(
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
+ val fileName = new File(uri.getPath).getName
if (distributedUris.contains(uriStr)) {
- logWarning(s"Resource $uri added multiple times to distributed cache.")
+ logWarning(s"Same path resource $uri added multiple times to
distributed cache.")
+ false
+ } else if (distributedNames.contains(fileName)) {
+ logWarning(s"Same name resource $uri added multiple times to
distributed cache")
false
} else {
distributedUris += uriStr
+ distributedNames += fileName
true
}
}
@@ -519,8 +528,7 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
- require(localizedPath != null)
- if (addToClasspath) {
+ if (addToClasspath && localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d6fb485d/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 74e268d..23050e8 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.io.{File, FileOutputStream}
+import java.io.{File, FileInputStream, FileOutputStream}
import java.net.URI
import java.util.Properties
@@ -285,6 +285,36 @@ class ClientSuite extends SparkFunSuite with Matchers with
BeforeAndAfterAll
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
+ test("ignore same name jars") {
+ val libs = Utils.createTempDir()
+ val jarsDir = new File(libs, "jars")
+ assert(jarsDir.mkdir())
+ new FileOutputStream(new File(libs, "RELEASE")).close()
+ val userLibs = Utils.createTempDir()
+
+ val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir)
+ val jar2 = TestUtils.createJarWithFiles(Map(), userLibs)
+ // Copy jar2 to jar3 with same name
+ val jar3 = {
+ val target = new File(userLibs, new File(jar1.toURI).getName)
+ val input = new FileInputStream(jar2.getPath)
+ val output = new FileOutputStream(target)
+ Utils.copyStream(input, output, closeStreams = true)
+ target.toURI.toURL
+ }
+
+ val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" ->
libs.getAbsolutePath))
+ .set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath))
+
+ val client = createClient(sparkConf)
+ val tempDir = Utils.createTempDir()
+ client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+
+ // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name
with jar1 will be
+ // ignored.
+ sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new
File(jar2.toURI).getName)))
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]