Repository: spark
Updated Branches:
  refs/heads/master 2f6e88fec -> 327ac83f5


http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index f5e912b..901a724 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -295,31 +295,30 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
   private val workerConf = new SparkConf()
 
   def testOffsetBytes(isCompressed: Boolean): Unit = {
-    val tmpDir2 = Utils.createTempDir()
-    val suffix = getSuffix(isCompressed)
-    val f1Path = tmpDir2 + "/f1" + suffix
-    writeLogFile(f1Path, 
"1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
-    val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
+    withTempDir { tmpDir2 =>
+      val suffix = getSuffix(isCompressed)
+      val f1Path = tmpDir2 + "/f1" + suffix
+      writeLogFile(f1Path, 
"1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8))
+      val f1Length = Utils.getFileLength(new File(f1Path), workerConf)
 
-    // Read first few bytes
-    assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
+      // Read first few bytes
+      assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3")
 
-    // Read some middle bytes
-    assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
+      // Read some middle bytes
+      assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6")
 
-    // Read last few bytes
-    assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
+      // Read last few bytes
+      assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n")
 
-    // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
+      // Read some nonexistent bytes in the beginning
+      assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3")
 
-    // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
+      // Read some nonexistent bytes at the end
+      assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n")
 
-    // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === 
"1\n2\n3\n4\n5\n6\n7\n8\n9\n")
-
-    Utils.deleteRecursively(tmpDir2)
+      // Read some nonexistent bytes on both ends
+      assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === 
"1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+    }
   }
 
   test("reading offset bytes of a file") {
@@ -331,41 +330,41 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
   }
 
   def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = {
-    val tmpDir = Utils.createTempDir()
-    val suffix = getSuffix(isCompressed)
-    val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ 
new File(tmpDir, "4")
-    writeLogFile(files(0).getAbsolutePath, 
"0123456789".getBytes(StandardCharsets.UTF_8))
-    writeLogFile(files(1).getAbsolutePath, 
"abcdefghij".getBytes(StandardCharsets.UTF_8))
-    writeLogFile(files(2).getAbsolutePath, 
"ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
-    writeLogFile(files(3).getAbsolutePath, 
"9876543210".getBytes(StandardCharsets.UTF_8))
-    val fileLengths = files.map(Utils.getFileLength(_, workerConf))
-
-    // Read first few bytes in the 1st file
-    assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
+    withTempDir { tmpDir =>
+      val suffix = getSuffix(isCompressed)
+      val files = (1 to 3).map(i =>
+        new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4")
+      writeLogFile(files(0).getAbsolutePath, 
"0123456789".getBytes(StandardCharsets.UTF_8))
+      writeLogFile(files(1).getAbsolutePath, 
"abcdefghij".getBytes(StandardCharsets.UTF_8))
+      writeLogFile(files(2).getAbsolutePath, 
"ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8))
+      writeLogFile(files(3).getAbsolutePath, 
"9876543210".getBytes(StandardCharsets.UTF_8))
+      val fileLengths = files.map(Utils.getFileLength(_, workerConf))
 
-    // Read bytes within the 1st file
-    assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
+      // Read first few bytes in the 1st file
+      assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234")
 
-    // Read bytes across 1st and 2nd file
-    assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
+      // Read bytes within the 1st file
+      assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567")
 
-    // Read bytes across 1st, 2nd and 3rd file
-    assert(Utils.offsetBytes(files, fileLengths, 5, 24) === 
"56789abcdefghijABCD")
+      // Read bytes across 1st and 2nd file
+      assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh")
 
-    // Read bytes across 3rd and 4th file
-    assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
+      // Read bytes across 1st, 2nd and 3rd file
+      assert(Utils.offsetBytes(files, fileLengths, 5, 24) === 
"56789abcdefghijABCD")
 
-    // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(files, fileLengths, -5, 18) === 
"0123456789abcdefgh")
+      // Read bytes across 3rd and 4th file
+      assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765")
 
-    // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(files, fileLengths, 18, 45) === 
"ijABCDEFGHIJ9876543210")
+      // Read some nonexistent bytes in the beginning
+      assert(Utils.offsetBytes(files, fileLengths, -5, 18) === 
"0123456789abcdefgh")
 
-    // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
-      "0123456789abcdefghijABCDEFGHIJ9876543210")
+      // Read some nonexistent bytes at the end
+      assert(Utils.offsetBytes(files, fileLengths, 18, 45) === 
"ijABCDEFGHIJ9876543210")
 
-    Utils.deleteRecursively(tmpDir)
+      // Read some nonexistent bytes on both ends
+      assert(Utils.offsetBytes(files, fileLengths, -5, 45) ===
+        "0123456789abcdefghijABCDEFGHIJ9876543210")
+    }
   }
 
   test("reading offset bytes across multiple files") {
@@ -427,27 +426,28 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
 
   test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
-    val parent: File = Utils.createTempDir()
-    // The parent directory has two child directories
-    val child1: File = Utils.createTempDir(parent.getCanonicalPath)
-    val child2: File = Utils.createTempDir(parent.getCanonicalPath)
-    val child3: File = Utils.createTempDir(child1.getCanonicalPath)
-    // set the last modified time of child1 to 30 secs old
-    child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
-
-    // although child1 is old, child2 is still new so return true
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
-    child2.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
-    parent.setLastModified(System.currentTimeMillis - (1000 * 30))
-    // although parent and its immediate children are new, child3 is still old
-    // we expect a full recursive search for new files.
-    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
-
-    child3.setLastModified(System.currentTimeMillis - (1000 * 30))
-    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+    withTempDir { parent =>
+      // The parent directory has two child directories
+      val child1: File = Utils.createTempDir(parent.getCanonicalPath)
+      val child2: File = Utils.createTempDir(parent.getCanonicalPath)
+      val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+      // set the last modified time of child1 to 30 secs old
+      child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
+
+      // although child1 is old, child2 is still new so return true
+      assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+      child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+      assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+      parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+      // although parent and its immediate children are new, child3 is still 
old
+      // we expect a full recursive search for new files.
+      assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+
+      child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+      assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5))
+    }
   }
 
   test("resolveURI") {
@@ -608,9 +608,8 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
   }
 
   test("loading properties from file") {
-    val tmpDir = Utils.createTempDir()
-    val outFile = File.createTempFile("test-load-spark-properties", "test", 
tmpDir)
-    try {
+    withTempDir { tmpDir =>
+      val outFile = File.createTempFile("test-load-spark-properties", "test", 
tmpDir)
       System.setProperty("spark.test.fileNameLoadB", "2")
       Files.write("spark.test.fileNameLoadA true\n" +
         "spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8)
@@ -621,8 +620,6 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
       val sparkConf = new SparkConf
       assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
       assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
-    } finally {
-      Utils.deleteRecursively(tmpDir)
     }
   }
 
@@ -638,52 +635,53 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
   }
 
   test("fetch hcfs dir") {
-    val tempDir = Utils.createTempDir()
-    val sourceDir = new File(tempDir, "source-dir")
-    sourceDir.mkdir()
-    val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
-    val sourceFile = File.createTempFile("someprefix", "somesuffix", 
innerSourceDir)
-    val targetDir = new File(tempDir, "target-dir")
-    Files.write("some text", sourceFile, StandardCharsets.UTF_8)
-
-    val path =
-      if (Utils.isWindows) {
-        new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
-      } else {
-        new Path("file://" + sourceDir.getAbsolutePath)
-      }
-    val conf = new Configuration()
-    val fs = Utils.getHadoopFileSystem(path.toString, conf)
+    withTempDir { tempDir =>
+      val sourceDir = new File(tempDir, "source-dir")
+      sourceDir.mkdir()
+      val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
+      val sourceFile = File.createTempFile("someprefix", "somesuffix", 
innerSourceDir)
+      val targetDir = new File(tempDir, "target-dir")
+      Files.write("some text", sourceFile, StandardCharsets.UTF_8)
+
+      val path =
+        if (Utils.isWindows) {
+          new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
+        } else {
+          new Path("file://" + sourceDir.getAbsolutePath)
+        }
+      val conf = new Configuration()
+      val fs = Utils.getHadoopFileSystem(path.toString, conf)
 
-    assert(!targetDir.isDirectory())
-    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
-    assert(targetDir.isDirectory())
+      assert(!targetDir.isDirectory())
+      Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+      assert(targetDir.isDirectory())
 
-    // Copy again to make sure it doesn't error if the dir already exists.
-    Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
+      // Copy again to make sure it doesn't error if the dir already exists.
+      Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
 
-    val destDir = new File(targetDir, sourceDir.getName())
-    assert(destDir.isDirectory())
+      val destDir = new File(targetDir, sourceDir.getName())
+      assert(destDir.isDirectory())
 
-    val destInnerDir = new File(destDir, innerSourceDir.getName)
-    assert(destInnerDir.isDirectory())
+      val destInnerDir = new File(destDir, innerSourceDir.getName)
+      assert(destInnerDir.isDirectory())
 
-    val destInnerFile = new File(destInnerDir, sourceFile.getName)
-    assert(destInnerFile.isFile())
+      val destInnerFile = new File(destInnerDir, sourceFile.getName)
+      assert(destInnerFile.isFile())
 
-    val filePath =
-      if (Utils.isWindows) {
-        new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
-      } else {
-        new Path("file://" + sourceFile.getAbsolutePath)
-      }
-    val testFileDir = new File(tempDir, "test-filename")
-    val testFileName = "testFName"
-    val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
-    Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
-                        conf, false, Some(testFileName))
-    val newFileName = new File(testFileDir, testFileName)
-    assert(newFileName.isFile())
+      val filePath =
+        if (Utils.isWindows) {
+          new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
+        } else {
+          new Path("file://" + sourceFile.getAbsolutePath)
+        }
+      val testFileDir = new File(tempDir, "test-filename")
+      val testFileName = "testFName"
+      val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
+      Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
+        conf, false, Some(testFileName))
+      val newFileName = new File(testFileDir, testFileName)
+      assert(newFileName.isFile())
+    }
   }
 
   test("shutdown hook manager") {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 2341949..85963ec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -67,6 +67,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with 
SQLTestUtilsBase with
   }
 
   /**
+   * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
+   * returns.
+   */
+  protected override def withTempDir(f: File => Unit): Unit = {
+    super.withTempDir { dir =>
+      f(dir)
+      waitForTasksToFinish()
+    }
+  }
+
+  /**
    * A helper function for turning off/on codegen.
    */
   protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String 
=> Unit): Unit = {
@@ -143,43 +154,6 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with 
SQLTestUtilsBase with
       test(name) { runOnThread() }
     }
   }
-}
-
-/**
- * Helper trait that can be extended by all external SQL test suites.
- *
- * This allows subclasses to plugin a custom `SQLContext`.
- * To use implicit methods, import `testImplicits._` instead of through the 
`SQLContext`.
- *
- * Subclasses should *not* create `SQLContext`s in the test suite constructor, 
which is
- * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in 
the same JVM.
- */
-private[sql] trait SQLTestUtilsBase
-  extends Eventually
-  with BeforeAndAfterAll
-  with SQLTestData
-  with PlanTestBase { self: Suite =>
-
-  protected def sparkContext = spark.sparkContext
-
-  // Shorthand for running a query using our SQLContext
-  protected lazy val sql = spark.sql _
-
-  /**
-   * A helper object for importing SQL implicits.
-   *
-   * Note that the alternative of importing `spark.implicits._` is not 
possible here.
-   * This is because we create the `SQLContext` immediately before the first 
test is run,
-   * but the implicits import is needed in the constructor.
-   */
-  protected object testImplicits extends SQLImplicits {
-    protected override def _sqlContext: SQLContext = self.spark.sqlContext
-  }
-
-  protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): 
Unit = {
-    SparkSession.setActiveSession(spark)
-    super.withSQLConf(pairs: _*)(f)
-  }
 
   /**
    * Copy file in jar's resource to a temp file, then pass it to `f`.
@@ -207,21 +181,6 @@ private[sql] trait SQLTestUtilsBase
   }
 
   /**
-   * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
-   * returns.
-   *
-   * @todo Probably this method should be moved to a more general place
-   */
-  protected def withTempDir(f: File => Unit): Unit = {
-    val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally {
-      // wait for all tasks to finish before deleting files
-      waitForTasksToFinish()
-      Utils.deleteRecursively(dir)
-    }
-  }
-
-  /**
    * Creates the specified number of temporary directories, which is then 
passed to `f` and will be
    * deleted after `f` returns.
    */
@@ -233,6 +192,43 @@ private[sql] trait SQLTestUtilsBase
       files.foreach(Utils.deleteRecursively)
     }
   }
+}
+
+/**
+ * Helper trait that can be extended by all external SQL test suites.
+ *
+ * This allows subclasses to plugin a custom `SQLContext`.
+ * To use implicit methods, import `testImplicits._` instead of through the 
`SQLContext`.
+ *
+ * Subclasses should *not* create `SQLContext`s in the test suite constructor, 
which is
+ * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in 
the same JVM.
+ */
+private[sql] trait SQLTestUtilsBase
+  extends Eventually
+  with BeforeAndAfterAll
+  with SQLTestData
+  with PlanTestBase { self: Suite =>
+
+  protected def sparkContext = spark.sparkContext
+
+  // Shorthand for running a query using our SQLContext
+  protected lazy val sql = spark.sql _
+
+  /**
+   * A helper object for importing SQL implicits.
+   *
+   * Note that the alternative of importing `spark.implicits._` is not 
possible here.
+   * This is because we create the `SQLContext` immediately before the first 
test is run,
+   * but the implicits import is needed in the constructor.
+   */
+  protected object testImplicits extends SQLImplicits {
+    protected override def _sqlContext: SQLContext = self.spark.sqlContext
+  }
+
+  protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): 
Unit = {
+    SparkSession.setActiveSession(spark)
+    super.withSQLConf(pairs: _*)(f)
+  }
 
   /**
    * Drops functions after calling `f`. A function is represented by 
(functionName, isTemporary).

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index dc96ec4..218bd18 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -56,15 +56,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
   import HiveClientBuilder.buildClient
 
   /**
-   * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
-   * returns.
-   */
-  protected def withTempDir(f: File => Unit): Unit = {
-    val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
-  }
-
-  /**
    * Drops table `tableName` after calling `f`.
    */
   protected def withTable(tableNames: String*)(f: => Unit): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index ada494e..6a0f523 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -557,16 +557,4 @@ trait TestSuiteBase extends SparkFunSuite with 
BeforeAndAfter with Logging {
       verifyOutput[W](output.toSeq, expectedOutput, useSet)
     }
   }
-
-  /**
-   * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
-   * returns.
-   * (originally from `SqlTestUtils`.)
-   * @todo Probably this method should be moved to a more general place
-   */
-  protected def withTempDir(f: File => Unit): Unit = {
-    val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to