Repository: spark
Updated Branches:
  refs/heads/master 7db81ac8a -> 1462b1766


[SPARK-24861][SS][TEST] create corrected temp directories in RateSourceSuite

## What changes were proposed in this pull request?

`RateSourceSuite` may leave garbage files under `sql/core/dummy`, we should use 
a corrected temp directory

## How was this patch tested?

test only

Author: Wenchen Fan <[email protected]>

Closes #21817 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1462b176
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1462b176
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1462b176

Branch: refs/heads/master
Commit: 1462b17666729cd6c9e8dfa2a1fe9c2020d3f25b
Parents: 7db81ac
Author: Wenchen Fan <[email protected]>
Authored: Fri Jul 20 13:40:26 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Jul 20 13:40:26 2018 +0800

----------------------------------------------------------------------
 .../sources/RateStreamProviderSuite.scala       | 127 ++++++++++---------
 1 file changed, 67 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1462b176/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index bf72e5c..9115a38 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.sql.execution.streaming.sources
 
-import java.nio.file.Files
 import java.util.Optional
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
@@ -54,12 +52,15 @@ class RateSourceSuite extends StreamTest {
   }
 
   test("microbatch in registry") {
-    DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
-      case ds: MicroBatchReadSupport =>
-        val reader = ds.createMicroBatchReader(Optional.empty(), "dummy", 
DataSourceOptions.empty())
-        assert(reader.isInstanceOf[RateStreamMicroBatchReader])
-      case _ =>
-        throw new IllegalStateException("Could not find read support for rate")
+    withTempDir { temp =>
+      DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
+        case ds: MicroBatchReadSupport =>
+          val reader = ds.createMicroBatchReader(
+            Optional.empty(), temp.getCanonicalPath, DataSourceOptions.empty())
+          assert(reader.isInstanceOf[RateStreamMicroBatchReader])
+        case _ =>
+          throw new IllegalStateException("Could not find read support for 
rate")
+      }
     }
   }
 
@@ -108,69 +109,75 @@ class RateSourceSuite extends StreamTest {
   }
 
   test("microbatch - set offset") {
-    val temp = Files.createTempDirectory("dummy").toString
-    val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), 
temp)
-    val startOffset = LongOffset(0L)
-    val endOffset = LongOffset(1L)
-    reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    assert(reader.getStartOffset() == startOffset)
-    assert(reader.getEndOffset() == endOffset)
+    withTempDir { temp =>
+      val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty(), 
temp.getCanonicalPath)
+      val startOffset = LongOffset(0L)
+      val endOffset = LongOffset(1L)
+      reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+      assert(reader.getStartOffset() == startOffset)
+      assert(reader.getEndOffset() == endOffset)
+    }
   }
 
   test("microbatch - infer offsets") {
-    val tempFolder = Files.createTempDirectory("dummy").toString
-    val reader = new RateStreamMicroBatchReader(
-      new DataSourceOptions(
-        Map("numPartitions" -> "1", "rowsPerSecond" -> "100", "useManualClock" 
-> "true").asJava),
-      tempFolder)
-    reader.clock.asInstanceOf[ManualClock].advance(100000)
-    reader.setOffsetRange(Optional.empty(), Optional.empty())
-    reader.getStartOffset() match {
-      case r: LongOffset => assert(r.offset === 0L)
-      case _ => throw new IllegalStateException("unexpected offset type")
-    }
-    reader.getEndOffset() match {
-      case r: LongOffset => assert(r.offset >= 100)
-      case _ => throw new IllegalStateException("unexpected offset type")
+    withTempDir { temp =>
+      val reader = new RateStreamMicroBatchReader(
+        new DataSourceOptions(
+          Map("numPartitions" -> "1", "rowsPerSecond" -> "100", 
"useManualClock" -> "true").asJava),
+        temp.getCanonicalPath)
+      reader.clock.asInstanceOf[ManualClock].advance(100000)
+      reader.setOffsetRange(Optional.empty(), Optional.empty())
+      reader.getStartOffset() match {
+        case r: LongOffset => assert(r.offset === 0L)
+        case _ => throw new IllegalStateException("unexpected offset type")
+      }
+      reader.getEndOffset() match {
+        case r: LongOffset => assert(r.offset >= 100)
+        case _ => throw new IllegalStateException("unexpected offset type")
+      }
     }
   }
 
   test("microbatch - predetermined batch size") {
-    val temp = Files.createTempDirectory("dummy").toString
-    val reader = new RateStreamMicroBatchReader(
-      new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"20").asJava), temp)
-    val startOffset = LongOffset(0L)
-    val endOffset = LongOffset(1L)
-    reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.planInputPartitions()
-    assert(tasks.size == 1)
-    val dataReader = tasks.get(0).createPartitionReader()
-    val data = ArrayBuffer[Row]()
-    while (dataReader.next()) {
-      data.append(dataReader.get())
+    withTempDir { temp =>
+      val reader = new RateStreamMicroBatchReader(
+        new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"20").asJava),
+        temp.getCanonicalPath)
+      val startOffset = LongOffset(0L)
+      val endOffset = LongOffset(1L)
+      reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+      val tasks = reader.planInputPartitions()
+      assert(tasks.size == 1)
+      val dataReader = tasks.get(0).createPartitionReader()
+      val data = ArrayBuffer[Row]()
+      while (dataReader.next()) {
+        data.append(dataReader.get())
+      }
+      assert(data.size === 20)
     }
-    assert(data.size === 20)
   }
 
   test("microbatch - data read") {
-    val temp = Files.createTempDirectory("dummy").toString
-    val reader = new RateStreamMicroBatchReader(
-      new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava), temp)
-    val startOffset = LongOffset(0L)
-    val endOffset = LongOffset(1L)
-    reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.planInputPartitions()
-    assert(tasks.size == 11)
-
-    val readData = tasks.asScala
-      .map(_.createPartitionReader())
-      .flatMap { reader =>
-        val buf = scala.collection.mutable.ListBuffer[Row]()
-        while (reader.next()) buf.append(reader.get())
-        buf
-      }
+    withTempDir { temp =>
+      val reader = new RateStreamMicroBatchReader(
+        new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava),
+        temp.getCanonicalPath)
+      val startOffset = LongOffset(0L)
+      val endOffset = LongOffset(1L)
+      reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
+      val tasks = reader.planInputPartitions()
+      assert(tasks.size == 11)
+
+      val readData = tasks.asScala
+        .map(_.createPartitionReader())
+        .flatMap { reader =>
+          val buf = scala.collection.mutable.ListBuffer[Row]()
+          while (reader.next()) buf.append(reader.get())
+          buf
+        }
 
-    assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
+      assert(readData.map(_.getLong(1)).sorted == Range(0, 33))
+    }
   }
 
   test("valueAtSecond") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to