This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee6ed43aa046 [SPARK-46733][CORE] Simplify the BlockManager by the exit 
operation only depend on interrupt thread
ee6ed43aa046 is described below

commit ee6ed43aa04682fa0b4c59a6b28c211b7e4fb184
Author: beliefer <[email protected]>
AuthorDate: Wed Jan 24 01:21:08 2024 -0600

    [SPARK-46733][CORE] Simplify the BlockManager by the exit operation only 
depend on interrupt thread
    
    ### What changes were proposed in this pull request?
    This PR propose to simplify the `BlockManager`.
    
    ### Why are the changes needed?
    Currently, close or destroy `BlockManager` depend on interrupt thread and 
the volatile variable `stopped`.
    In fact, we can change the `stopped` to a local variable on stack and let 
the close operation of `BlockManager` only depend on interrupt thread.
    For further optimization, this PR using `running` instead of `stopped`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    GA tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #44732 from beliefer/simplify-ContextCleaner.
    
    Authored-by: beliefer <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e996444d7846..42bbd025177b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -2209,7 +2209,6 @@ private[spark] object BlockManager {
       new ConcurrentHashMap)
 
     private val POLL_TIMEOUT = 1000
-    @volatile private var stopped = false
 
     private val cleaningThread = new Thread() { override def run(): Unit = { 
keepCleaning() } }
     cleaningThread.setDaemon(true)
@@ -2235,13 +2234,13 @@ private[spark] object BlockManager {
     }
 
     def stop(): Unit = {
-      stopped = true
       cleaningThread.interrupt()
       cleaningThread.join()
     }
 
     private def keepCleaning(): Unit = {
-      while (!stopped) {
+      var running = true
+      while (running) {
         try {
           Option(referenceQueue.remove(POLL_TIMEOUT))
             .map(_.asInstanceOf[ReferenceWithCleanup])
@@ -2251,7 +2250,7 @@ private[spark] object BlockManager {
             }
         } catch {
           case _: InterruptedException =>
-            // no-op
+            running = false
           case NonFatal(e) =>
             logError("Error in cleaning thread", e)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to