This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f6a044c [SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover
`Client.prepareLocalResources` with custom `STAGING_FILE_REPLICATION`
f6a044c is described below
commit f6a044cf8cd83e6b3b30e515acbac0ec81607463
Author: yangjie01 <[email protected]>
AuthorDate: Tue Nov 9 08:07:38 2021 -0800
[SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover
`Client.prepareLocalResources` with custom `STAGING_FILE_REPLICATION`
### What changes were proposed in this pull request?
This pr add a new UT to cover
`o.a.s.deploy.yarn.Client.prepareLocalResources` method with custom
`STAGING_FILE_REPLICATION` configuration and change other related UTs to verify
that the `replication` passed into the `copyFileToRemote` method is `None`
explicitly.
### Why are the changes needed?
Add new UT.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes #34531 from LuciferYang/SPARK-37239-followup.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 +++++++++++++++++-----
1 file changed, 27 insertions(+), 8 deletions(-)
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 58e49c9..a8815dc 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -132,7 +132,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
.set("spark.yarn.dist.jars", ADDED)
val client = createClient(sparkConf, args = Array("--jar", USER))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), any(), any(classOf[MutableHashMap[URI, Path]]),
anyBoolean(), any())
+ any(classOf[Path]), meq(None), any(classOf[MutableHashMap[URI, Path]]),
anyBoolean(), any())
val tempDir = Utils.createTempDir()
try {
@@ -308,12 +308,12 @@ class ClientSuite extends SparkFunSuite with Matchers {
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}",
s"local:${single.getAbsolutePath()}/*")))
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(jar1.toURI())), any(),
- any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(jar2.toURI())), any(),
- any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(jar3.toURI())), any(),
- any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(jar1.toURI())),
+ meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(jar2.toURI())),
+ meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(jar3.toURI())),
+ meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -330,7 +330,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(archive.toURI())), any(),
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(archive.toURI())), meq(None),
any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -340,6 +340,25 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
}
+ test("SPARK-37239: distribute jars archive with set
STAGING_FILE_REPLICATION") {
+ val temp = Utils.createTempDir()
+ val archive = TestUtils.createJarWithFiles(Map(), temp)
+ val replication = 5
+
+ val sparkConf = new SparkConf()
+ .set(SPARK_ARCHIVE, archive.getPath())
+ .set(STAGING_FILE_REPLICATION, replication)
+ val client = createClient(sparkConf)
+ client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
+
+ // It is difficult to assert the result of `setReplication` in UT because
this method in
+ // `RawLocalFileSystem` always return true and not change the value of
`replication`.
+ // So we can only assert the call of `client.copyFileToRemote` has passed
in a non `None`.
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new
Path(archive.toURI())),
+ meq(Some(replication.toShort)), any(classOf[MutableHashMap[URI, Path]]),
anyBoolean(), any())
+ classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+ }
+
test("distribute archive multiple times") {
val libs = Utils.createTempDir()
// Create jars dir and RELEASE file to avoid IllegalStateException.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]