This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 202400ae40dc [SPARK-50273][SS] Improve logging for RocksDB lock 
acquire/release cases
202400ae40dc is described below

commit 202400ae40dc69cb8f992e4b81cf66c4f93dfda2
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Sat Nov 9 10:58:16 2024 +0900

    [SPARK-50273][SS] Improve logging for RocksDB lock acquire/release cases
    
    ### What changes were proposed in this pull request?
    Improve logging for RocksDB lock acquire/release cases
    
    ### Why are the changes needed?
    Improve log formatting for lock acquire/release scenarios
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    Sample logs when task context is not available:
    ```
    sql/core/target/unit-tests.log:3728150:15:17:27.982 
pool-1-thread-1-ScalaTest-running-RocksDBSuite INFO RocksDB [Thread-17]: 
RocksDB instance was acquired by ownerThread=[ThreadId: Some(17)] for 
opType=close_store
    sql/core/target/unit-tests.log:3728172:15:17:27.985 
pool-1-thread-1-ScalaTest-running-RocksDBSuite INFO RocksDB [Thread-17]: 
RocksDB instance was released by releaseThread=[ThreadId: Some(17)] with 
ownerThread=[ThreadId: Some(17)] for opType=close_store
    ```
    
    Sample logs when task context is available:
    ```
    sql/core/target/unit-tests.log:3771705:15:23:03.517 Executor task launch 
worker for task 0.0 in stage 45.0 (TID 136) INFO RocksDB 
StateStoreId(opId=0,partId=0,name=default): RocksDB instance was acquired by 
ownerThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136] 
for opType=load_store
    ```
    ```
    sql/core/target/unit-tests.log:3771832:15:23:03.781 Executor task launch 
worker for task 0.0 in stage 45.0 (TID 136) INFO RocksDB 
StateStoreId(opId=0,partId=0,name=default): RocksDB instance was released by 
releaseThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 136] 
with ownerThread=[ThreadId: Some(104), task: partition 0.0 in stage 45.0, TID 
136] for opType=load_store
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #48806 from anishshri-db/task/SPARK-50273.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 23 +++++++++++-----------
 1 file changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 167aefc88b7d..f8e9885cef14 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -959,7 +959,8 @@ class RocksDB(
       Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] {
         _ => this.release(StoreTaskCompletionListener, 
Some(newAcquiredThreadInfo))
       })
-      logInfo(log"RocksDB instance was acquired by ${MDC(LogKeys.THREAD, 
acquiredThreadInfo)} " +
+      logInfo(log"RocksDB instance was acquired by " +
+        log"ownerThread=${MDC(LogKeys.THREAD, acquiredThreadInfo)} " +
         log"for opType=${MDC(LogKeys.OP_TYPE, opType.toString)}")
     }
   }
@@ -978,28 +979,28 @@ class RocksDB(
     if (acquiredThreadInfo != null) {
       val release = releaseForThreadOpt match {
         case Some(releaseForThread) if releaseForThread.threadRef.get.isEmpty 
=>
-          logInfo(log"Thread reference is empty when attempting to release 
for" +
-            log" opType=${MDC(LogKeys.OP_TYPE, opType.toString)}, ignoring 
release." +
-            log" Lock is held by ${MDC(LogKeys.THREAD, acquiredThreadInfo)}")
+          logInfo(log"Thread reference is empty when attempting to release for 
" +
+            log"opType=${MDC(LogKeys.OP_TYPE, opType.toString)}, ignoring 
release. " +
+            log"Lock is held by ownerThread=${MDC(LogKeys.THREAD, 
acquiredThreadInfo)}")
           false
         // NOTE: we compare the entire acquiredThreadInfo object to ensure 
that we are
         // releasing not only for the right thread but the right task as well. 
This is
         // inconsistent with the logic for acquire which uses only the thread 
ID, consider
         // updating this in future.
         case Some(releaseForThread) if acquiredThreadInfo != releaseForThread 
=>
-          logInfo(log"Thread info for release" +
-            log" ${MDC(LogKeys.THREAD, releaseForThreadOpt.get)}" +
-            log" does not match the acquired thread when attempting to" +
-            log" release for opType=${MDC(LogKeys.OP_TYPE, opType.toString)}, 
ignoring release." +
-            log" Lock is held by ${MDC(LogKeys.THREAD, acquiredThreadInfo)}")
+          logInfo(log"Thread info for " +
+            log"releaseThread=${MDC(LogKeys.THREAD, releaseForThreadOpt.get)} 
" +
+            log"does not match the acquired thread when attempting to " +
+            log"release for opType=${MDC(LogKeys.OP_TYPE, opType.toString)}, 
ignoring release. " +
+            log"Lock is held by ownerThread=${MDC(LogKeys.THREAD, 
acquiredThreadInfo)}")
           false
         case _ => true
       }
 
       if (release) {
         logInfo(log"RocksDB instance was released by " +
-          log"${MDC(LogKeys.THREAD, AcquiredThreadInfo())}. " +
-          log"acquiredThreadInfo: ${MDC(LogKeys.THREAD, acquiredThreadInfo)} " 
+
+          log"releaseThread=${MDC(LogKeys.THREAD, AcquiredThreadInfo())} " +
+          log"with ownerThread=${MDC(LogKeys.THREAD, acquiredThreadInfo)} " +
           log"for opType=${MDC(LogKeys.OP_TYPE, opType.toString)}")
         acquiredThreadInfo = null
         acquireLock.notifyAll()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to