Repository: spark
Updated Branches:
  refs/heads/master 14a3bb3a0 -> 60043f224


[SS][MINOR] Fix flaky test in DatastreamReaderWriterSuite. temp checkpoint dir 
should be deleted

## What changes were proposed in this pull request?

Stopping query while it is being initialized can throw interrupt exception, in 
which case temporary checkpoint directories will not be deleted, and the test 
will fail.

Author: Tathagata Das <[email protected]>

Closes #18442 from tdas/DatastreamReaderWriterSuite-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60043f22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60043f22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60043f22

Branch: refs/heads/master
Commit: 60043f22458668ac7ecba94fa78953f23a6bdcec
Parents: 14a3bb3
Author: Tathagata Das <[email protected]>
Authored: Thu Jul 6 00:20:26 2017 -0700
Committer: Tathagata Das <[email protected]>
Committed: Thu Jul 6 00:20:26 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/streaming/test/DataStreamReaderWriterSuite.scala      | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/60043f22/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 3de0ae6..e8a6202 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -641,6 +641,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   test("temp checkpoint dir should be deleted if a query is stopped without 
errors") {
     import testImplicits._
     val query = MemoryStream[Int].toDS.writeStream.format("console").start()
+    query.processAllAvailable()
     val checkpointDir = new Path(
       
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
     val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to