This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 202400ae40dc [SPARK-50273][SS] Improve logging for RocksDB lock
acquire/release cases
202400ae40dc is described below
commit 202400ae40dc69cb8f992e4b81cf66c4f93dfda2
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Sat Nov 9 10:58:16 2024 +0900
[SPARK-50273][SS] Improve logging for RocksDB lock acquire/release cases
### What changes were proposed in this pull request?
Improve logging for RocksDB lock acquire/release cases
### Why are the changes needed?
Improve log formatting for lock acquire/release scenarios
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
Sample logs when task context is not available:
```
sql/core/target/unit-tests.log:3728150:15:17:27.982
pool-1-thread-1-ScalaTest-running-RocksDBSuite INFO RocksDB [Thread-17]:
RocksDB instance was acquired by ownerThread=[ThreadId: Some(17)] for
opType=close_store
sql/core/target/unit-tests.log:3728172:15:17:27.985
pool-1-thread-1-ScalaTest-running-RocksDBSuite INFO RocksDB [Thread-17]:
RocksDB instance was released by releaseThread=[ThreadId: Some(17)] with
ownerThread=[ThreadId: Some(17)] for opType=close_store
```
Sample logs when task context is available:
```
sql/core/target/unit-tests.log:3771705:15:23:03.517 Executor task launch
worker for task 0.0 in stage 45.0 (TID 136) INFO RocksDB
StateStoreId(opId=0,partId=0,name=default): RocksDB instance was acquired by
ownerThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136]
for opType=load_store
```
```
sql/core/target/unit-tests.log:3771832:15:23:03.781 Executor task launch
worker for task 0.0 in stage 45.0 (TID 136) INFO RocksDB
StateStoreId(opId=0,partId=0,name=default): RocksDB instance was released by
releaseThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136]
with ownerThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID
136] for opType=load_store
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48806 from anishshri-db/task/SPARK-50273.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 23 +++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 167aefc88b7d..f8e9885cef14 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -959,7 +959,8 @@ class RocksDB(
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] {
_ => this.release(StoreTaskCompletionListener,
Some(newAcquiredThreadInfo))
})
- logInfo(log"RocksDB instance was acquired by ${MDC(LogKeys.THREAD,
acquiredThreadInfo)} " +
+ logInfo(log"RocksDB instance was acquired by " +
+ log"ownerThread=${MDC(LogKeys.THREAD, acquiredThreadInfo)} " +
log"for opType=${MDC(LogKeys.OP_TYPE, opType.toString)}")
}
}
@@ -978,28 +979,28 @@ class RocksDB(
if (acquiredThreadInfo != null) {
val release = releaseForThreadOpt match {
case Some(releaseForThread) if releaseForThread.threadRef.get.isEmpty
=>
- logInfo(log"Thread reference is empty when attempting to release
for" +
- log" opType=${MDC(LogKeys.OP_TYPE, opType.toString)}, ignoring
release." +
- log" Lock is held by ${MDC(LogKeys.THREAD, acquiredThreadInfo)}")
+ logInfo(log"Thread reference is empty when attempting to release for
" +
+ log"opType=${MDC(LogKeys.OP_TYPE, opType.toString)}, ignoring
release. " +
+ log"Lock is held by ownerThread=${MDC(LogKeys.THREAD,
acquiredThreadInfo)}")
false
// NOTE: we compare the entire acquiredThreadInfo object to ensure
that we are
// releasing not only for the right thread but the right task as well.
This is
// inconsistent with the logic for acquire which uses only the thread
ID, consider
// updating this in future.
case Some(releaseForThread) if acquiredThreadInfo != releaseForThread
=>
- logInfo(log"Thread info for release" +
- log" ${MDC(LogKeys.THREAD, releaseForThreadOpt.get)}" +
- log" does not match the acquired thread when attempting to" +
- log" release for opType=${MDC(LogKeys.OP_TYPE, opType.toString)},
ignoring release." +
- log" Lock is held by ${MDC(LogKeys.THREAD, acquiredThreadInfo)}")
+ logInfo(log"Thread info for " +
+ log"releaseThread=${MDC(LogKeys.THREAD, releaseForThreadOpt.get)}
" +
+ log"does not match the acquired thread when attempting to " +
+ log"release for opType=${MDC(LogKeys.OP_TYPE, opType.toString)},
ignoring release. " +
+ log"Lock is held by ownerThread=${MDC(LogKeys.THREAD,
acquiredThreadInfo)}")
false
case _ => true
}
if (release) {
logInfo(log"RocksDB instance was released by " +
- log"${MDC(LogKeys.THREAD, AcquiredThreadInfo())}. " +
- log"acquiredThreadInfo: ${MDC(LogKeys.THREAD, acquiredThreadInfo)} "
+
+ log"releaseThread=${MDC(LogKeys.THREAD, AcquiredThreadInfo())} " +
+ log"with ownerThread=${MDC(LogKeys.THREAD, acquiredThreadInfo)} " +
log"for opType=${MDC(LogKeys.OP_TYPE, opType.toString)}")
acquiredThreadInfo = null
acquireLock.notifyAll()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]