This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8c26c014f11 [SPARK-44504][SS] Unload provider thereby forcing DB instance close and releasing resources on maintenance task error 8c26c014f11 is described below commit 8c26c014f11b1b9d7d6c3b315fbb633c2bb2ca73 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Fri Jul 21 13:46:01 2023 +0900 [SPARK-44504][SS] Unload provider thereby forcing DB instance close and releasing resources on maintenance task error ### What changes were proposed in this pull request? Unload provider thereby forcing DB instance close and releasing resources on maintenance task error ### Why are the changes needed? If we don't do the close, the DB instance and corresponding resources (memory, file descriptors etc) are always left open and the pointer to these objects is lost since loadedProviders is cleared. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` ), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-17 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), ForkJoinPool.commonPool-worker-31 (daemon=true), ForkJoinPool.commonPool-worker-23 (daemon=true), state-store-maintenance-task (daemon=true), ForkJoinPool.commonPool-worker-9 (daemon=true) ===== [info] Run completed in 2 minutes, 49 seconds. [info] Total number of tests run: 32 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 32, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #42098 from anishshri-db/task/SPARK-44504. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 4 ++++ .../org/apache/spark/sql/execution/streaming/state/StateStore.scala | 3 +++ 2 files changed, 7 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 7961c5e716b..d4366fe732b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -463,6 +463,8 @@ class RocksDB( /** Release all resources */ def close(): Unit = { try { + // Acquire DB instance lock and release at the end to allow for synchronized access + acquire() closeDB() readOptions.close() @@ -477,6 +479,8 @@ class RocksDB( } catch { case e: Exception => logWarning("Error closing RocksDB", e) + } finally { + release() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 359cff81aea..cabad54be64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -616,6 +616,9 @@ object StateStore extends Logging { onError = { loadedProviders.synchronized { logInfo("Stopping maintenance task since an error was encountered.") stopMaintenanceTask() + // SPARK-44504 - Unload explicitly to force closing underlying DB instance + // and releasing allocated resources, especially for RocksDBStateStoreProvider. + loadedProviders.keySet.foreach { key => unload(key) } loadedProviders.clear() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org