Repository: spark
Updated Branches:
  refs/heads/master 114324832 -> e094d0115


[SPARK-18803][TESTS] Fix JarEntry-related & path-related test failures and skip 
some tests by path length limitation on Windows

## What changes were proposed in this pull request?

This PR proposes to fix some tests being failed on Windows as below for several 
problems.

### Incorrect path handling

- FileSuite
  ```
  [info] - binary file input as byte array *** FAILED *** (500 milliseconds)
  [info]   
"file:/C:/projects/spark/target/tmp/spark-e7c3a3b8-0a4b-4a7f-9ebe-7c4883e48624/record-bytestream-00000.bin"
 did not contain 
"C:\projects\spark\target\tmp\spark-e7c3a3b8-0a4b-4a7f-9ebe-7c4883e48624\record-bytestream-00000.bin"
 (FileSuite.scala:258)
  [info]   org.scalatest.exceptions.TestFailedException:
  [info]   at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
  ...
  ```
  ```
  [info] - Get input files via old Hadoop API *** FAILED *** (1 second, 94 
milliseconds)
  [info]   
Set("/C:/projects/spark/target/tmp/spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200/output/part-00000",
 
"/C:/projects/spark/target/tmp/spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200/output/part-00001")
 did not equal 
Set("C:\projects\spark\target\tmp\spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200\output/part-00000",
 
"C:\projects\spark\target\tmp\spark-cf5b1f8b-c5ed-43e0-8d17-546ebbfa8200\output/part-00001")
 (FileSuite.scala:535)
  [info]   org.scalatest.exceptions.TestFailedException:
  [info]   at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
  ...
  ```

  ```
  [info] - Get input files via new Hadoop API *** FAILED *** (313 milliseconds)
  [info]   
Set("/C:/projects/spark/target/tmp/spark-12bc1540-1111-4df6-9c4d-79e0e614407c/output/part-00000",
 
"/C:/projects/spark/target/tmp/spark-12bc1540-1111-4df6-9c4d-79e0e614407c/output/part-00001")
 did not equal 
Set("C:\projects\spark\target\tmp\spark-12bc1540-1111-4df6-9c4d-79e0e614407c\output/part-00000",
 
"C:\projects\spark\target\tmp\spark-12bc1540-1111-4df6-9c4d-79e0e614407c\output/part-00001")
 (FileSuite.scala:549)
  [info]   org.scalatest.exceptions.TestFailedException:
  ...
  ```

- TaskResultGetterSuite

  ```
  [info] - handling results larger than max RPC message size *** FAILED *** (1 
second, 579 milliseconds)
  [info]   1 did not equal 0 Expect result to be removed from the block 
manager. (TaskResultGetterSuite.scala:129)
  [info]   org.scalatest.exceptions.TestFailedException:
  [info]   ...
  [info]   Cause: java.net.URISyntaxException: Illegal character in path at 
index 12: 
string:///C:\projects\spark\target\tmp\spark-93c485af-68da-440f-a907-aac7acd5fc25\repro\MyException.java
  [info]   at java.net.URI$Parser.fail(URI.java:2848)
  [info]   at java.net.URI$Parser.checkChars(URI.java:3021)
  ...
  ```
  ```
  [info] - failed task deserialized with the correct classloader (SPARK-11195) 
*** FAILED *** (0 milliseconds)
  [info]   java.lang.IllegalArgumentException: Illegal character in path at 
index 12: 
string:///C:\projects\spark\target\tmp\spark-93c485af-68da-440f-a907-aac7acd5fc25\repro\MyException.java
  [info]   at java.net.URI.create(URI.java:852)
  ...
  ```

- SparkSubmitSuite

  ```
  [info]   java.lang.IllegalArgumentException: Illegal character in path at 
index 12: 
string:///C:\projects\spark\target\tmp\1481210831381-0\870903339\MyLib.java
  [info]   at java.net.URI.create(URI.java:852)
  [info]   at 
org.apache.spark.TestUtils$.org$apache$spark$TestUtils$$createURI(TestUtils.scala:112)
  ...
  ```

### Incorrect separate for JarEntry

After the path fix from above, then `TaskResultGetterSuite` throws another 
exception as below:

```
[info] - failed task deserialized with the correct classloader (SPARK-11195) 
*** FAILED *** (907 milliseconds)
[info]   java.lang.ClassNotFoundException: repro.MyException
[info]   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
...
```

This is because `Paths.get` concatenates the given paths to an OS-specific path 
(Windows `\` and Linux `/`). However, for `JarEntry` we should comply ZIP 
specification meaning it should be always `/` according to ZIP specification.

See `4.4.17 file name: (Variable)` in 
https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT

### Long path problem on Windows

Some tests in `ShuffleSuite` via `ShuffleNettySuite` were skipped due to the 
same reason with SPARK-18718

## How was this patch tested?

Manually via AppVeyor.

**Before**

- `FileSuite`, `TaskResultGetterSuite`,`SparkSubmitSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/164-tmp-windows-base 
(please grep each to check each)
- `ShuffleSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/157-tmp-windows-base

**After**

- `FileSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/166-FileSuite
- `TaskResultGetterSuite`
  
https://ci.appveyor.com/project/spark-test/spark/build/173-TaskResultGetterSuite
- `SparkSubmitSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/167-SparkSubmitSuite
- `ShuffleSuite`
  https://ci.appveyor.com/project/spark-test/spark/build/176-ShuffleSuite

Author: hyukjinkwon <[email protected]>

Closes #16234 from HyukjinKwon/test-errors-windows.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e094d011
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e094d011
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e094d011

Branch: refs/heads/master
Commit: e094d011561bcd992a554c02c586d6bd9df67b32
Parents: 1143248
Author: hyukjinkwon <[email protected]>
Authored: Sat Dec 10 19:55:22 2016 +0000
Committer: Sean Owen <[email protected]>
Committed: Sat Dec 10 19:55:22 2016 +0000

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/TestUtils.scala  |  5 ++++-
 core/src/test/scala/org/apache/spark/FileSuite.scala  | 11 ++++++++---
 .../test/scala/org/apache/spark/ShuffleSuite.scala    | 14 +++++++++++++-
 .../scala/org/apache/spark/deploy/IvyTestUtils.scala  |  2 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala    |  2 +-
 .../spark/scheduler/TaskResultGetterSuite.scala       |  2 +-
 .../sql/execution/joins/BroadcastJoinSuite.scala      |  2 +-
 7 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 2909191..b5b2014 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -93,7 +93,10 @@ private[spark] object TestUtils {
     val jarStream = new JarOutputStream(jarFileStream, new 
java.util.jar.Manifest())
 
     for (file <- files) {
-      val jarEntry = new JarEntry(Paths.get(directoryPrefix.getOrElse(""), 
file.getName).toString)
+      // The `name` for the argument in `JarEntry` should use / for its 
separator. This is
+      // ZIP specification.
+      val prefix = directoryPrefix.map(d => s"$d/").getOrElse("")
+      val jarEntry = new JarEntry(prefix + file.getName)
       jarStream.putNextEntry(jarEntry)
 
       val in = new FileInputStream(file)

http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala 
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 89f0b1c..6538507 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -22,6 +22,7 @@ import java.util.zip.GZIPOutputStream
 
 import scala.io.Source
 
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
 import org.apache.hadoop.mapred.{FileAlreadyExistsException, FileSplit, 
JobConf, TextInputFormat, TextOutputFormat}
@@ -255,7 +256,7 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
     val (infile: String, indata: PortableDataStream) = inRdd.collect.head
 
     // Make sure the name and array match
-    assert(infile.contains(outFileName)) // a prefix may get added
+    assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added
     assert(indata.toArray === testOutput)
   }
 
@@ -532,7 +533,9 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
         .mapPartitionsWithInputSplit { (split, part) =>
           Iterator(split.asInstanceOf[FileSplit].getPath.toUri.getPath)
         }.collect()
-    assert(inputPaths.toSet === Set(s"$outDir/part-00000", 
s"$outDir/part-00001"))
+    val outPathOne = new Path(outDir, "part-00000").toUri.getPath
+    val outPathTwo = new Path(outDir, "part-00001").toUri.getPath
+    assert(inputPaths.toSet === Set(outPathOne, outPathTwo))
   }
 
   test("Get input files via new Hadoop API") {
@@ -546,7 +549,9 @@ class FileSuite extends SparkFunSuite with 
LocalSparkContext {
         .mapPartitionsWithInputSplit { (split, part) =>
           Iterator(split.asInstanceOf[NewFileSplit].getPath.toUri.getPath)
         }.collect()
-    assert(inputPaths.toSet === Set(s"$outDir/part-00000", 
s"$outDir/part-00001"))
+    val outPathOne = new Path(outDir, "part-00000").toUri.getPath
+    val outPathTwo = new Path(outDir, "part-00001").toUri.getPath
+    assert(inputPaths.toSet === Set(outPathOne, outPathTwo))
   }
 
   test("spark.files.ignoreCorruptFiles should work both HadoopRDD and 
NewHadoopRDD") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index a854f5bb..dc3a28e 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{MapStatus, MyRDD, 
SparkListener, SparkListene
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.shuffle.ShuffleWriter
 import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId}
-import org.apache.spark.util.MutablePair
+import org.apache.spark.util.{MutablePair, Utils}
 
 abstract class ShuffleSuite extends SparkFunSuite with Matchers with 
LocalSparkContext {
 
@@ -51,7 +51,10 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
     assert(valuesFor2.toList.sorted === List(1))
   }
 
+  // Some tests using `local-cluster` here are failed on Windows due to the 
failure of initiating
+  // executors by the path length limitation. See SPARK-18718.
   test("shuffle non-zero block size") {
+    assume(!Utils.isWindows)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val NUM_BLOCKS = 3
 
@@ -77,6 +80,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("shuffle serializer") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)
@@ -93,6 +97,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("zero sized blocks") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -120,6 +125,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("zero sized blocks without kryo") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -145,6 +151,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("shuffle on mutable pairs") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -157,6 +164,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("sorting on mutable pairs") {
+    assume(!Utils.isWindows)
     // This is not in SortingSuite because of the local cluster setup.
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -172,6 +180,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("cogroup using mutable pairs") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -199,6 +208,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("subtract mutable pairs") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -213,6 +223,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("sort with Java non serializable class - Kryo") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     val myConf = conf.clone().set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
@@ -227,6 +238,7 @@ abstract class ShuffleSuite extends SparkFunSuite with 
Matchers with LocalSparkC
   }
 
   test("sort with Java non serializable class - Java") {
+    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local 
and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala 
b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
index c9b3d65..f50cb38 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
@@ -142,7 +142,7 @@ private[deploy] object IvyTestUtils {
         |}
       """.stripMargin
     val sourceFile =
-      new JavaSourceFromString(new File(dir, className).getAbsolutePath, 
contents)
+      new JavaSourceFromString(new File(dir, className).toURI.getPath, 
contents)
     createCompiledClass(className, dir, sourceFile, Seq.empty)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 6268880..9417930 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -461,7 +461,7 @@ class SparkSubmitSuite
     val tempDir = Utils.createTempDir()
     val srcDir = new File(tempDir, "sparkrtest")
     srcDir.mkdirs()
-    val excSource = new JavaSourceFromString(new File(srcDir, 
"DummyClass").getAbsolutePath,
+    val excSource = new JavaSourceFromString(new File(srcDir, 
"DummyClass").toURI.getPath,
       """package sparkrtest;
         |
         |public class DummyClass implements java.io.Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index ee95e4f..c9e682f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -171,7 +171,7 @@ class TaskResultGetterSuite extends SparkFunSuite with 
BeforeAndAfter with Local
     val tempDir = Utils.createTempDir()
     val srcDir = new File(tempDir, "repro/")
     srcDir.mkdirs()
-    val excSource = new JavaSourceFromString(new File(srcDir, 
"MyException").getAbsolutePath,
+    val excSource = new JavaSourceFromString(new File(srcDir, 
"MyException").toURI.getPath,
       """package repro;
         |
         |public class MyException extends Exception {

http://git-wip-us.apache.org/repos/asf/spark/blob/e094d011/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 7c4f763..0783935 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -86,7 +86,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
     plan
   }
 
-  // This tests here are failed on Windows due to the failure of initiating 
executors
+  // The tests here are failed on Windows due to the failure of initiating 
executors
   // by the path length limitation. See SPARK-18718.
   test("unsafe broadcast hash join updates peak execution memory") {
     assume(!Utils.isWindows)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to