Repository: hive
Updated Branches:
  refs/heads/master 76b696c26 -> a1034102d


HIVE-19127: Concurrency fixes in QueryResultsCache (Jason Dere, reviewed by 
Deepak Jaiswal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a1034102
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a1034102
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a1034102

Branch: refs/heads/master
Commit: a1034102d3580922f6c8f9d186272280d6917802
Parents: 76b696c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Apr 9 16:48:23 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Apr 9 16:48:23 2018 -0700

----------------------------------------------------------------------
 .../ql/cache/results/QueryResultsCache.java     | 112 +++++++++++--------
 1 file changed, 68 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a1034102/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java 
b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
index ac5ae57..b1a3646 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java
@@ -502,32 +502,39 @@ public final class QueryResultsCache {
         return false;
       }
 
-      if (requiresMove) {
-        // Move the query results to the query cache directory.
-        cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
-        dataDirMoved = true;
-      }
-      LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
-          queryResultsPath, cachedResultsPath, resultSize, queryText);
-
-      // Create a new FetchWork to reference the new cache location.
-      FetchWork fetchWorkForCache =
-          new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), 
fetchWork.getLimit());
-      fetchWorkForCache.setCachedResult(true);
-      cacheEntry.fetchWork = fetchWorkForCache;
-      cacheEntry.cachedResultsPath = cachedResultsPath;
-      cacheEntry.size = resultSize;
-      this.cacheSize += resultSize;
-      cacheEntry.createTime = System.currentTimeMillis();
-
-      cacheEntry.setStatus(CacheEntryStatus.VALID);
-      // Mark this entry as being in use. Caller will need to release later.
-      cacheEntry.addReader();
-
-      scheduleEntryInvalidation(cacheEntry);
-
-      // Notify any queries waiting on this cacheEntry to become valid.
+      // Synchronize on the cache entry so that no one else can invalidate 
this entry
+      // while we are in the process of setting it to valid.
       synchronized (cacheEntry) {
+        if (cacheEntry.getStatus() == CacheEntryStatus.INVALID) {
+          // Entry either expired, or was invalidated due to table updates
+          return false;
+        }
+
+        if (requiresMove) {
+          // Move the query results to the query cache directory.
+          cachedResultsPath = moveResultsToCacheDirectory(queryResultsPath);
+          dataDirMoved = true;
+        }
+        LOG.info("Moved query results from {} to {} (size {}) for query '{}'",
+            queryResultsPath, cachedResultsPath, resultSize, queryText);
+
+        // Create a new FetchWork to reference the new cache location.
+        FetchWork fetchWorkForCache =
+            new FetchWork(cachedResultsPath, fetchWork.getTblDesc(), 
fetchWork.getLimit());
+        fetchWorkForCache.setCachedResult(true);
+        cacheEntry.fetchWork = fetchWorkForCache;
+        cacheEntry.cachedResultsPath = cachedResultsPath;
+        cacheEntry.size = resultSize;
+        this.cacheSize += resultSize;
+        cacheEntry.createTime = System.currentTimeMillis();
+
+        cacheEntry.setStatus(CacheEntryStatus.VALID);
+        // Mark this entry as being in use. Caller will need to release later.
+        cacheEntry.addReader();
+
+        scheduleEntryInvalidation(cacheEntry);
+
+        // Notify any queries waiting on this cacheEntry to become valid.
         cacheEntry.notifyAll();
       }
 
@@ -564,7 +571,11 @@ public final class QueryResultsCache {
     try {
       writeLock.lock();
       LOG.info("Clearing the results cache");
-      for (CacheEntry entry : lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY)) {
+      CacheEntry[] allEntries = null;
+      synchronized (lru) {
+        allEntries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
+      }
+      for (CacheEntry entry : allEntries) {
         try {
           removeEntry(entry);
         } catch (Exception err) {
@@ -611,10 +622,15 @@ public final class QueryResultsCache {
 
   public void removeEntry(CacheEntry entry) {
     entry.invalidate();
-    removeFromLookup(entry);
-    lru.remove(entry);
-    // Should the cache size be updated here, or after the result data has 
actually been deleted?
-    cacheSize -= entry.size;
+    rwLock.writeLock().lock();
+    try {
+      removeFromLookup(entry);
+      lru.remove(entry);
+      // Should the cache size be updated here, or after the result data has 
actually been deleted?
+      cacheSize -= entry.size;
+    } finally {
+      rwLock.writeLock().unlock();
+    }
   }
 
   private void removeFromLookup(CacheEntry entry) {
@@ -674,6 +690,20 @@ public final class QueryResultsCache {
     return true;
   }
 
+  private CacheEntry findEntryToRemove() {
+    // Entries should be in LRU order in the keyset iterator.
+    Set<CacheEntry> entries = lru.keySet();
+    synchronized (lru) {
+      for (CacheEntry removalCandidate : entries) {
+        if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
+          continue;
+        }
+        return removalCandidate;
+      }
+    }
+    return null;
+  }
+
   private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) {
     if (hasSpaceForCacheEntry(entry, size)) {
       return true;
@@ -682,20 +712,14 @@ public final class QueryResultsCache {
     LOG.info("Clearing space for cache entry for query: [{}] with size {}",
         entry.getQueryText(), size);
 
-    // Entries should be in LRU order in the keyset iterator.
-    CacheEntry[] entries = lru.keySet().toArray(EMPTY_CACHEENTRY_ARRAY);
-    for (CacheEntry removalCandidate : entries) {
-      if (removalCandidate.getStatus() != CacheEntryStatus.VALID) {
-        // Only entries marked as valid should have results that can be 
removed.
-        continue;
-      }
-      // Only delete the entry if it has no readers.
-      if (!(removalCandidate.numReaders() > 0)) {
-        LOG.info("Removing entry: {}", removalCandidate);
-        removeEntry(removalCandidate);
-        if (hasSpaceForCacheEntry(entry, size)) {
-          return true;
-        }
+    CacheEntry removalCandidate;
+    while ((removalCandidate = findEntryToRemove()) != null) {
+      LOG.info("Removing entry: {}", removalCandidate);
+      removeEntry(removalCandidate);
+      // TODO: Should we wait for the entry to actually be deleted from HDFS? 
Would have to
+      // poll the reader count, waiting for it to reach 0, at which point 
cleanup should occur.
+      if (hasSpaceForCacheEntry(entry, size)) {
+        return true;
       }
     }
 

Reply via email to