This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new ef1f377a3938 [SPARK-48889][SS] testStream to unload state stores
before finishing
ef1f377a3938 is described below
commit ef1f377a39381557172a142cdca915f5cad1be51
Author: Siying Dong <[email protected]>
AuthorDate: Wed Jul 17 12:26:59 2024 +0900
[SPARK-48889][SS] testStream to unload state stores before finishing
### What changes were proposed in this pull request?
In the end of each testStream() call, unload all state stores from the
executor
### Why are the changes needed?
Currently, after a test, we don't unload state store or disable maintenance
task. So after a test, the maintenance task can run and fail as the checkpoint
directory is already deleted. This might cause an issue and fail the next test.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
See existing tests to pass
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47339 from siying/SPARK-48889.
Authored-by: Siying Dong <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 3a245558be882ae94f507976e4e4fb8c1d9bf344)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 +++++++
1 file changed, 7 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index cb7995abcd09..69e404a47383 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -809,6 +809,13 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
case (key, None) => sparkSession.conf.unset(key)
}
sparkSession.streams.removeListener(listener)
+ // The state store is stopped here to unload all state stores and
terminate all maintenance
+ // threads. It is necessary because the temp directory used by the
checkpoint directory
+ // may be deleted soon after, and the maintenance thread may see
unexpected error and
+ // cause unexpected behavior. Doing it after a test finishes might be
too late because
+ // sometimes the checkpoint directory is under `withTempDir`, and in
this case the temp
+ // directory is deleted before the test finishes.
+ StateStore.stop()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]