This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6d8b842784ae [SPARK-49593][SS] Throw RocksDB exception to the caller
on DB close if an error is seen
6d8b842784ae is described below
commit 6d8b842784aeecbee78bd452f7213fc2adbbba00
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Sep 11 15:48:24 2024 +0900
[SPARK-49593][SS] Throw RocksDB exception to the caller on DB close if an
error is seen
### What changes were proposed in this pull request?
Throw RocksDB exception to the caller on DB close
API here:
https://javadoc.io/doc/org.rocksdb/rocksdbjni/6.8.1/org/rocksdb/RocksDB.html#closeE()
### Why are the changes needed?
Without this, errors on close are being silently ignored and we attempt to
open the DB again in the context of the same task. Trying to ensure that we
exit in this scenario and try as part of a separate attempt.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
```
[info] Total number of tests run: 205
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 205, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48068 from anishshri-db/task/SPARK-49593.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/execution/streaming/state/RocksDB.scala | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index bbb28ac646ec..81e80629092a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -263,7 +263,7 @@ class RocksDB(
logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
try {
if (loadedVersion != version) {
- closeDB()
+ closeDB(ignoreException = false)
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
@@ -945,13 +945,17 @@ class RocksDB(
logInfo(log"Opened DB with conf ${MDC(LogKeys.CONFIG, conf)}")
}
- private def closeDB(): Unit = {
+ private def closeDB(ignoreException: Boolean = true): Unit = {
if (db != null) {
-
// Cancel and wait until all background work finishes
db.cancelAllBackgroundWork(true)
- // Close the DB instance
- db.close()
+ if (ignoreException) {
+ // Close the DB instance
+ db.close()
+ } else {
+ // Close the DB instance and throw the exception if any
+ db.closeE()
+ }
db = null
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]