Repository: spark Updated Branches: refs/heads/master 2f6e88fec -> 327ac83f5
http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index f5e912b..901a724 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -295,31 +295,30 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { private val workerConf = new SparkConf() def testOffsetBytes(isCompressed: Boolean): Unit = { - val tmpDir2 = Utils.createTempDir() - val suffix = getSuffix(isCompressed) - val f1Path = tmpDir2 + "/f1" + suffix - writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8)) - val f1Length = Utils.getFileLength(new File(f1Path), workerConf) + withTempDir { tmpDir2 => + val suffix = getSuffix(isCompressed) + val f1Path = tmpDir2 + "/f1" + suffix + writeLogFile(f1Path, "1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8)) + val f1Length = Utils.getFileLength(new File(f1Path), workerConf) - // Read first few bytes - assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3") + // Read first few bytes + assert(Utils.offsetBytes(f1Path, f1Length, 0, 5) === "1\n2\n3") - // Read some middle bytes - assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6") + // Read some middle bytes + assert(Utils.offsetBytes(f1Path, f1Length, 4, 11) === "3\n4\n5\n6") - // Read last few bytes - assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n") + // Read last few bytes + assert(Utils.offsetBytes(f1Path, f1Length, 12, 18) === "7\n8\n9\n") - // Read some nonexistent bytes in the beginning - assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3") + // Read some nonexistent bytes in the beginning + assert(Utils.offsetBytes(f1Path, f1Length, -5, 5) === "1\n2\n3") - // Read some nonexistent bytes at the end - assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n") + // Read some nonexistent bytes at the end + assert(Utils.offsetBytes(f1Path, f1Length, 12, 22) === "7\n8\n9\n") - // Read some nonexistent bytes on both ends - assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n") - - Utils.deleteRecursively(tmpDir2) + // Read some nonexistent bytes on both ends + assert(Utils.offsetBytes(f1Path, f1Length, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n") + } } test("reading offset bytes of a file") { @@ -331,41 +330,41 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } def testOffsetBytesMultipleFiles(isCompressed: Boolean): Unit = { - val tmpDir = Utils.createTempDir() - val suffix = getSuffix(isCompressed) - val files = (1 to 3).map(i => new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4") - writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8)) - writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8)) - writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8)) - writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8)) - val fileLengths = files.map(Utils.getFileLength(_, workerConf)) - - // Read first few bytes in the 1st file - assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234") + withTempDir { tmpDir => + val suffix = getSuffix(isCompressed) + val files = (1 to 3).map(i => + new File(tmpDir, i.toString + suffix)) :+ new File(tmpDir, "4") + writeLogFile(files(0).getAbsolutePath, "0123456789".getBytes(StandardCharsets.UTF_8)) + writeLogFile(files(1).getAbsolutePath, "abcdefghij".getBytes(StandardCharsets.UTF_8)) + writeLogFile(files(2).getAbsolutePath, "ABCDEFGHIJ".getBytes(StandardCharsets.UTF_8)) + writeLogFile(files(3).getAbsolutePath, "9876543210".getBytes(StandardCharsets.UTF_8)) + val fileLengths = files.map(Utils.getFileLength(_, workerConf)) - // Read bytes within the 1st file - assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567") + // Read first few bytes in the 1st file + assert(Utils.offsetBytes(files, fileLengths, 0, 5) === "01234") - // Read bytes across 1st and 2nd file - assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh") + // Read bytes within the 1st file + assert(Utils.offsetBytes(files, fileLengths, 5, 8) === "567") - // Read bytes across 1st, 2nd and 3rd file - assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD") + // Read bytes across 1st and 2nd file + assert(Utils.offsetBytes(files, fileLengths, 8, 18) === "89abcdefgh") - // Read bytes across 3rd and 4th file - assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765") + // Read bytes across 1st, 2nd and 3rd file + assert(Utils.offsetBytes(files, fileLengths, 5, 24) === "56789abcdefghijABCD") - // Read some nonexistent bytes in the beginning - assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh") + // Read bytes across 3rd and 4th file + assert(Utils.offsetBytes(files, fileLengths, 25, 35) === "FGHIJ98765") - // Read some nonexistent bytes at the end - assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210") + // Read some nonexistent bytes in the beginning + assert(Utils.offsetBytes(files, fileLengths, -5, 18) === "0123456789abcdefgh") - // Read some nonexistent bytes on both ends - assert(Utils.offsetBytes(files, fileLengths, -5, 45) === - "0123456789abcdefghijABCDEFGHIJ9876543210") + // Read some nonexistent bytes at the end + assert(Utils.offsetBytes(files, fileLengths, 18, 45) === "ijABCDEFGHIJ9876543210") - Utils.deleteRecursively(tmpDir) + // Read some nonexistent bytes on both ends + assert(Utils.offsetBytes(files, fileLengths, -5, 45) === + "0123456789abcdefghijABCDEFGHIJ9876543210") + } } test("reading offset bytes across multiple files") { @@ -427,27 +426,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("doesDirectoryContainFilesNewerThan") { // create some temporary directories and files - val parent: File = Utils.createTempDir() - // The parent directory has two child directories - val child1: File = Utils.createTempDir(parent.getCanonicalPath) - val child2: File = Utils.createTempDir(parent.getCanonicalPath) - val child3: File = Utils.createTempDir(child1.getCanonicalPath) - // set the last modified time of child1 to 30 secs old - child1.setLastModified(System.currentTimeMillis() - (1000 * 30)) - - // although child1 is old, child2 is still new so return true - assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) - - child2.setLastModified(System.currentTimeMillis - (1000 * 30)) - assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) - - parent.setLastModified(System.currentTimeMillis - (1000 * 30)) - // although parent and its immediate children are new, child3 is still old - // we expect a full recursive search for new files. - assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) - - child3.setLastModified(System.currentTimeMillis - (1000 * 30)) - assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + withTempDir { parent => + // The parent directory has two child directories + val child1: File = Utils.createTempDir(parent.getCanonicalPath) + val child2: File = Utils.createTempDir(parent.getCanonicalPath) + val child3: File = Utils.createTempDir(child1.getCanonicalPath) + // set the last modified time of child1 to 30 secs old + child1.setLastModified(System.currentTimeMillis() - (1000 * 30)) + + // although child1 is old, child2 is still new so return true + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + child2.setLastModified(System.currentTimeMillis - (1000 * 30)) + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + parent.setLastModified(System.currentTimeMillis - (1000 * 30)) + // although parent and its immediate children are new, child3 is still old + // we expect a full recursive search for new files. + assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + + child3.setLastModified(System.currentTimeMillis - (1000 * 30)) + assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) + } } test("resolveURI") { @@ -608,9 +608,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } test("loading properties from file") { - val tmpDir = Utils.createTempDir() - val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir) - try { + withTempDir { tmpDir => + val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir) System.setProperty("spark.test.fileNameLoadB", "2") Files.write("spark.test.fileNameLoadA true\n" + "spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8) @@ -621,8 +620,6 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { val sparkConf = new SparkConf assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true) assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2) - } finally { - Utils.deleteRecursively(tmpDir) } } @@ -638,52 +635,53 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { } test("fetch hcfs dir") { - val tempDir = Utils.createTempDir() - val sourceDir = new File(tempDir, "source-dir") - sourceDir.mkdir() - val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) - val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) - val targetDir = new File(tempDir, "target-dir") - Files.write("some text", sourceFile, StandardCharsets.UTF_8) - - val path = - if (Utils.isWindows) { - new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/")) - } else { - new Path("file://" + sourceDir.getAbsolutePath) - } - val conf = new Configuration() - val fs = Utils.getHadoopFileSystem(path.toString, conf) + withTempDir { tempDir => + val sourceDir = new File(tempDir, "source-dir") + sourceDir.mkdir() + val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) + val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) + val targetDir = new File(tempDir, "target-dir") + Files.write("some text", sourceFile, StandardCharsets.UTF_8) + + val path = + if (Utils.isWindows) { + new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/")) + } else { + new Path("file://" + sourceDir.getAbsolutePath) + } + val conf = new Configuration() + val fs = Utils.getHadoopFileSystem(path.toString, conf) - assert(!targetDir.isDirectory()) - Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) - assert(targetDir.isDirectory()) + assert(!targetDir.isDirectory()) + Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) + assert(targetDir.isDirectory()) - // Copy again to make sure it doesn't error if the dir already exists. - Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) + // Copy again to make sure it doesn't error if the dir already exists. + Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false) - val destDir = new File(targetDir, sourceDir.getName()) - assert(destDir.isDirectory()) + val destDir = new File(targetDir, sourceDir.getName()) + assert(destDir.isDirectory()) - val destInnerDir = new File(destDir, innerSourceDir.getName) - assert(destInnerDir.isDirectory()) + val destInnerDir = new File(destDir, innerSourceDir.getName) + assert(destInnerDir.isDirectory()) - val destInnerFile = new File(destInnerDir, sourceFile.getName) - assert(destInnerFile.isFile()) + val destInnerFile = new File(destInnerDir, sourceFile.getName) + assert(destInnerFile.isFile()) - val filePath = - if (Utils.isWindows) { - new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/")) - } else { - new Path("file://" + sourceFile.getAbsolutePath) - } - val testFileDir = new File(tempDir, "test-filename") - val testFileName = "testFName" - val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf) - Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(), - conf, false, Some(testFileName)) - val newFileName = new File(testFileDir, testFileName) - assert(newFileName.isFile()) + val filePath = + if (Utils.isWindows) { + new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/")) + } else { + new Path("file://" + sourceFile.getAbsolutePath) + } + val testFileDir = new File(tempDir, "test-filename") + val testFileName = "testFName" + val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf) + Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(), + conf, false, Some(testFileName)) + val newFileName = new File(testFileDir, testFileName) + assert(newFileName.isFile()) + } } test("shutdown hook manager") { http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 2341949..85963ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -67,6 +67,17 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with } /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected override def withTempDir(f: File => Unit): Unit = { + super.withTempDir { dir => + f(dir) + waitForTasksToFinish() + } + } + + /** * A helper function for turning off/on codegen. */ protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { @@ -143,43 +154,6 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with test(name) { runOnThread() } } } -} - -/** - * Helper trait that can be extended by all external SQL test suites. - * - * This allows subclasses to plugin a custom `SQLContext`. - * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`. - * - * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is - * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. - */ -private[sql] trait SQLTestUtilsBase - extends Eventually - with BeforeAndAfterAll - with SQLTestData - with PlanTestBase { self: Suite => - - protected def sparkContext = spark.sparkContext - - // Shorthand for running a query using our SQLContext - protected lazy val sql = spark.sql _ - - /** - * A helper object for importing SQL implicits. - * - * Note that the alternative of importing `spark.implicits._` is not possible here. - * This is because we create the `SQLContext` immediately before the first test is run, - * but the implicits import is needed in the constructor. - */ - protected object testImplicits extends SQLImplicits { - protected override def _sqlContext: SQLContext = self.spark.sqlContext - } - - protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { - SparkSession.setActiveSession(spark) - super.withSQLConf(pairs: _*)(f) - } /** * Copy file in jar's resource to a temp file, then pass it to `f`. @@ -207,21 +181,6 @@ private[sql] trait SQLTestUtilsBase } /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - * - * @todo Probably this method should be moved to a more general place - */ - protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally { - // wait for all tasks to finish before deleting files - waitForTasksToFinish() - Utils.deleteRecursively(dir) - } - } - - /** * Creates the specified number of temporary directories, which is then passed to `f` and will be * deleted after `f` returns. */ @@ -233,6 +192,43 @@ private[sql] trait SQLTestUtilsBase files.foreach(Utils.deleteRecursively) } } +} + +/** + * Helper trait that can be extended by all external SQL test suites. + * + * This allows subclasses to plugin a custom `SQLContext`. + * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`. + * + * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is + * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. + */ +private[sql] trait SQLTestUtilsBase + extends Eventually + with BeforeAndAfterAll + with SQLTestData + with PlanTestBase { self: Suite => + + protected def sparkContext = spark.sparkContext + + // Shorthand for running a query using our SQLContext + protected lazy val sql = spark.sql _ + + /** + * A helper object for importing SQL implicits. + * + * Note that the alternative of importing `spark.implicits._` is not possible here. + * This is because we create the `SQLContext` immediately before the first test is run, + * but the implicits import is needed in the constructor. + */ + protected object testImplicits extends SQLImplicits { + protected override def _sqlContext: SQLContext = self.spark.sqlContext + } + + protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + SparkSession.setActiveSession(spark) + super.withSQLConf(pairs: _*)(f) + } /** * Drops functions after calling `f`. A function is represented by (functionName, isTemporary). http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index dc96ec4..218bd18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -56,15 +56,6 @@ class VersionsSuite extends SparkFunSuite with Logging { import HiveClientBuilder.buildClient /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - */ - protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally Utils.deleteRecursively(dir) - } - - /** * Drops table `tableName` after calling `f`. */ protected def withTable(tableNames: String*)(f: => Unit): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/327ac83f/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index ada494e..6a0f523 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -557,16 +557,4 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output.toSeq, expectedOutput, useSet) } } - - /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - * (originally from `SqlTestUtils`.) - * @todo Probably this method should be moved to a more general place - */ - protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally Utils.deleteRecursively(dir) - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
