This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 811b7e3  [SPARK-38080][TESTS][SS] Flaky test: 
StreamingQueryManagerSuite: 'awaitAnyTermination with timeout and 
resetTerminated'
811b7e3 is described below

commit 811b7e35f0c0d88c83d1ab3f2fef86463b3ae714
Author: Shixiong Zhu <[email protected]>
AuthorDate: Tue Feb 1 23:27:20 2022 -0800

    [SPARK-38080][TESTS][SS] Flaky test: StreamingQueryManagerSuite: 
'awaitAnyTermination with timeout and resetTerminated'
    
    ### What changes were proposed in this pull request?
    
    Fix a flaky test.
    
    ### Why are the changes needed?
    
    `StreamingQueryManagerSuite: 'awaitAnyTermination with timeout and 
resetTerminated'` is a flaky test.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    - The flaky test can be reproduced by adding a `Thread.sleep(100)` in 
https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L346
    - Using the above reproduction to verify the PR.
    
    Closes #35372 from zsxwing/SPARK-38080.
    
    Authored-by: Shixiong Zhu <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/streaming/StreamingQueryManagerSuite.scala   | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 91d6d77..cc66ce8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -201,6 +201,10 @@ class StreamingQueryManagerSuite extends StreamTest {
 
       // After that query is stopped, awaitAnyTerm should throw exception
       eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait 
for query to stop
+      // When `isActive` becomes `false`, `StreamingQueryManager` may not 
receive the error yet.
+      // Hence, call `stop` to wait until the thread of `q3` exits so that we 
can ensure
+      // `StreamingQueryManager` has already received the error.
+      q3.stop()
       testAwaitAnyTermination(
         ExpectException[SparkException],
         awaitTimeout = 100.milliseconds,
@@ -217,6 +221,10 @@ class StreamingQueryManagerSuite extends StreamTest {
       require(!q4.isActive)
       val q5 = stopRandomQueryAsync(10.milliseconds, withError = true)
       eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
+      // When `isActive` becomes `false`, `StreamingQueryManager` may not 
receive the error yet.
+      // Hence, call `stop` to wait until the thread of `q5` exits so that we 
can ensure
+      // `StreamingQueryManager` has already received the error.
+      q5.stop()
       // After q5 terminates with exception, awaitAnyTerm should start 
throwing exception
       testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 
2.seconds)
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to