This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 811b7e3 [SPARK-38080][TESTS][SS] Flaky test:
StreamingQueryManagerSuite: 'awaitAnyTermination with timeout and
resetTerminated'
811b7e3 is described below
commit 811b7e35f0c0d88c83d1ab3f2fef86463b3ae714
Author: Shixiong Zhu <[email protected]>
AuthorDate: Tue Feb 1 23:27:20 2022 -0800
[SPARK-38080][TESTS][SS] Flaky test: StreamingQueryManagerSuite:
'awaitAnyTermination with timeout and resetTerminated'
### What changes were proposed in this pull request?
Fix a flaky test.
### Why are the changes needed?
`StreamingQueryManagerSuite: 'awaitAnyTermination with timeout and
resetTerminated'` is a flaky test.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- The flaky test can be reproduced by adding a `Thread.sleep(100)` in
https://github.com/apache/spark/blob/v3.2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L346
- Using the above reproduction to verify the PR.
Closes #35372 from zsxwing/SPARK-38080.
Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/streaming/StreamingQueryManagerSuite.scala | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 91d6d77..cc66ce8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -201,6 +201,10 @@ class StreamingQueryManagerSuite extends StreamTest {
// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait
for query to stop
+ // When `isActive` becomes `false`, `StreamingQueryManager` may not
receive the error yet.
+ // Hence, call `stop` to wait until the thread of `q3` exits so that we
can ensure
+ // `StreamingQueryManager` has already received the error.
+ q3.stop()
testAwaitAnyTermination(
ExpectException[SparkException],
awaitTimeout = 100.milliseconds,
@@ -217,6 +221,10 @@ class StreamingQueryManagerSuite extends StreamTest {
require(!q4.isActive)
val q5 = stopRandomQueryAsync(10.milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
+ // When `isActive` becomes `false`, `StreamingQueryManager` may not
receive the error yet.
+ // Hence, call `stop` to wait until the thread of `q5` exits so that we
can ensure
+ // `StreamingQueryManager` has already received the error.
+ q5.stop()
// After q5 terminates with exception, awaitAnyTerm should start
throwing exception
testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout =
2.seconds)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]