Repository: spark Updated Branches: refs/heads/master 3cfbeb70b -> e78540282
[SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the `java.io.tmpdir` folder ## What changes were proposed in this pull request? If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them. ## How was this patch tested? Existing Jenkins tests Author: Shixiong Zhu <[email protected]> Closes #12093 from zsxwing/temp-file. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7854028 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7854028 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7854028 Branch: refs/heads/master Commit: e785402826dcd984d9312470464714ba6c908a49 Parents: 3cfbeb7 Author: Shixiong Zhu <[email protected]> Authored: Thu Mar 31 12:17:25 2016 -0700 Committer: Andrew Or <[email protected]> Committed: Thu Mar 31 12:17:25 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/StreamTest.scala | 2 +- .../streaming/ContinuousQueryManagerSuite.scala | 3 ++- .../streaming/DataFrameReaderWriterSuite.scala | 3 ++- .../sql/streaming/FileStreamSinkSuite.scala | 4 +-- .../sql/streaming/FileStreamSourceSuite.scala | 26 ++++++++++---------- .../spark/sql/streaming/FileStressSuite.scala | 8 +++--- 6 files changed, 24 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 4ca7394..b5be7ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -265,7 +265,7 @@ trait StreamTest extends QueryTest with Timeouts { } val testThread = Thread.currentThread() - val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath + val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath try { startedTest.foreach { action => http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 54ce98d..29bd3e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with @volatile var query: StreamExecution = null try { val df = ds.toDF - val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath + val metadataRoot = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath query = sqlContext .streams .startQuery( http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index c1bab9b..102473d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -69,7 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ - private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath after { sqlContext.streams.active.foreach(_.stop()) http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 7f31611..8cf5ded 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -29,8 +29,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { val inputData = MemoryStream[Int] val df = inputData.toDF() - val outputDir = Utils.createTempDir("stream.output").getCanonicalPath - val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath val query = df.write http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 89de15a..054f5c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -202,8 +202,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from text files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -224,8 +224,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from json files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema)) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -258,8 +258,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from json files with inferring schema") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") // Add a file so that we can infer its schema stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") @@ -279,8 +279,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from parquet files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema)) val filtered = fileSource.toDF().filter($"value" contains "keep") @@ -301,7 +301,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("file stream source without schema") { - val src = Utils.createTempDir("streaming.src") + val src = Utils.createTempDir(namePrefix = "streaming.src") // Only "text" doesn't need a schema createFileStreamSource("text", src.getCanonicalPath) @@ -318,8 +318,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("fault tolerance") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -346,8 +346,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ import testImplicits._ test("file source stress test") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val ds = textSource.toDS[String]().map(_.toInt + 1) http://git-wip-us.apache.org/repos/asf/spark/blob/e7854028/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 5a1bfb3..3916430 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -43,10 +43,10 @@ class FileStressSuite extends StreamTest with SharedSQLContext { test("fault tolerance stress test") { val numRecords = 10000 - val inputDir = Utils.createTempDir("stream.input").getCanonicalPath - val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath - val outputDir = Utils.createTempDir("stream.output").getCanonicalPath - val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath + val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath + val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath @volatile var continue = true --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
