This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 17d0cfc  [SPARK-26917][SQL] Cache lock recache by condition
17d0cfc is described below

commit 17d0cfcaa4a43fd55b81065d907538a9c1bf569b
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Thu Feb 21 09:04:50 2019 -0600

    [SPARK-26917][SQL] Cache lock recache by condition
    
    ## What changes were proposed in this pull request?
    
    Related to SPARK-26617 and SPARK-26548.  There was a new location we found 
where we were still seeing the locks.  We traced it to the recacheByCondition 
function.  In this PR I have changed that function so that the writeLock is not 
held while the condition is being evaluated.
    
    cloud-fan & gatorsmile This is a further tweak to the other cache PRs we 
have done (which have helped us tremendously).
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major 
issues and it is working well.
    CacheManager has no explicit unit test but is used in many places 
internally as part of the SharedState.
    
    Closes #23833 from DaveDeCaprio/cache-lock-recacheByCondition.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/sql/execution/CacheManager.scala    | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 398d7b4..c6ee735 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -145,17 +145,19 @@ class CacheManager extends Logging {
         _.sameResult(plan)
       }
     val plansToUncache = mutable.Buffer[CachedData]()
-    writeLock {
+    readLock {
       val it = cachedData.iterator()
       while (it.hasNext) {
         val cd = it.next()
         if (shouldRemove(cd.plan)) {
           plansToUncache += cd
-          it.remove()
         }
       }
     }
     plansToUncache.foreach { cd =>
+      writeLock {
+        cachedData.remove(cd)
+      }
       cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
     }
     // Re-compile dependent cached queries after removing the cached query.
@@ -193,19 +195,21 @@ class CacheManager extends Logging {
       spark: SparkSession,
       condition: CachedData => Boolean): Unit = {
     val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
-    writeLock {
+    readLock {
       val it = cachedData.iterator()
       while (it.hasNext) {
         val cd = it.next()
         if (condition(cd)) {
           needToRecache += cd
-          // Remove the cache entry before we create a new one, so that we can 
have a different
-          // physical plan.
-          it.remove()
         }
       }
     }
     needToRecache.map { cd =>
+      writeLock {
+        // Remove the cache entry before we create a new one, so that we can 
have a different
+        // physical plan.
+        cachedData.remove(cd)
+      }
       cd.cachedRepresentation.cacheBuilder.clearCache()
       val plan = spark.sessionState.executePlan(cd.plan).executedPlan
       val newCache = InMemoryRelation(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to