This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b21b250  HBASE-26281 DBB got from BucketCache would be freed 
unexpectedly before RPC completed (#3680)
b21b250 is described below

commit b21b2505702c26271a7bc6ea9126b6dd957d607b
Author: chenglei <[email protected]>
AuthorDate: Fri Sep 17 22:15:23 2021 +0800

    HBASE-26281 DBB got from BucketCache would be freed unexpectedly before RPC 
completed (#3680)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 372 +++++++++-------
 .../hadoop/hbase/io/hfile/bucket/BucketEntry.java  |  26 +-
 .../TestAvoidCellReferencesIntoShippedBlocks.java  |  25 +-
 .../hbase/io/hfile/bucket/TestBucketCache.java     |  30 +-
 .../io/hfile/bucket/TestBucketCacheRefCnt.java     | 491 ++++++++++++++++++++-
 .../io/hfile/bucket/TestBucketWriterThread.java    |   7 +-
 .../io/hfile/bucket/TestByteBufferIOEngine.java    |   3 +-
 .../hadoop/hbase/io/hfile/bucket/TestRAMCache.java |   2 +-
 8 files changed, 758 insertions(+), 198 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index c24494f..45f46a3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -49,6 +49,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -419,7 +420,7 @@ public class BucketCache implements BlockCache, HeapSize {
       boolean wait) {
     if (cacheEnabled) {
       if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
-        if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, 
cachedItem)) {
+        if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
           BucketEntry bucketEntry = backingMap.get(cacheKey);
           if (bucketEntry != null && bucketEntry.isRpcRef()) {
             // avoid replace when there are RPC refs for the bucket entry in 
bucket cache
@@ -433,7 +434,11 @@ public class BucketCache implements BlockCache, HeapSize {
     }
   }
 
-  private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable 
cachedItem,
+  protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, 
Cacheable newBlock) {
+    return BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, 
newBlock);
+  }
+
+  protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable 
cachedItem,
       boolean inMemory, boolean wait) {
     if (!cacheEnabled) {
       return;
@@ -441,8 +446,7 @@ public class BucketCache implements BlockCache, HeapSize {
     LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
     // Stuff the entry into the RAM cache so it can get drained to the 
persistent store
     RAMQueueEntry re =
-        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 
inMemory,
-              createRecycler(cacheKey));
+        new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), 
inMemory);
     /**
      * Don't use ramCache.put(cacheKey, re) here. because there may be a 
existing entry with same
      * key in ramCache, the heap size of bucket cache need to update if 
replacing entry from
@@ -540,13 +544,25 @@ public class BucketCache implements BlockCache, HeapSize {
     return null;
   }
 
+  /**
+   * This method is invoked after the bucketEntry is removed from {@link 
BucketCache#backingMap}
+   */
   void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean 
decrementBlockNumber) {
-    bucketAllocator.freeBlock(bucketEntry.offset());
-    realCacheSize.add(-1 * bucketEntry.getLength());
+    bucketEntry.markAsEvicted();
     blocksByHFile.remove(cacheKey);
     if (decrementBlockNumber) {
       this.blockNumber.decrement();
     }
+    cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
+  }
+
+  /**
+   * Free the {{@link BucketEntry} actually,which could only be invoked when 
the
+   * {@link BucketEntry#refCnt} becoming 0.
+   */
+  void freeBucketEntry(BucketEntry bucketEntry) {
+    bucketAllocator.freeBlock(bucketEntry.offset());
+    realCacheSize.add(-1 * bucketEntry.getLength());
   }
 
   /**
@@ -554,10 +570,10 @@ public class BucketCache implements BlockCache, HeapSize {
    * 1. Close an HFile, and clear all cached blocks. <br>
    * 2. Call {@link Admin#clearBlockCache(TableName)} to clear all blocks for 
a given table.<br>
    * <p>
-   * Firstly, we'll try to remove the block from RAMCache. If it doesn't exist 
in RAMCache, then try
-   * to evict from backingMap. Here we only need to free the reference from 
bucket cache by calling
-   * {@link BucketEntry#markedAsEvicted}. If there're still some RPC referring 
this block, block can
-   * only be de-allocated when all of them release the block.
+   * Firstly, we'll try to remove the block from RAMCache,and then try to 
evict from backingMap.
+   * Here we evict the block from backingMap immediately, but only free the 
reference from bucket
+   * cache by calling {@link BucketEntry#markedAsEvicted}. If there're still 
some RPC referring this
+   * block, block can only be de-allocated when all of them release the block.
    * <p>
    * NOTICE: we need to grab the write offset lock firstly before releasing 
the reference from
    * bucket cache. if we don't, we may read an {@link BucketEntry} with refCnt 
= 0 when
@@ -567,43 +583,92 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
+    return doEvictBlock(cacheKey, null);
+  }
+
+  /**
+   * Evict the {@link BlockCacheKey} and {@link BucketEntry} from {@link 
BucketCache#backingMap} and
+   * {@link BucketCache#ramCache}. <br/>
+   * NOTE:When Evict from {@link BucketCache#backingMap},only the matched 
{@link BlockCacheKey} and
+   * {@link BucketEntry} could be removed.
+   * @param cacheKey {@link BlockCacheKey} to evict.
+   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to 
evict.
+   * @return true to indicate whether we've evicted successfully or not.
+   */
+  private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry 
bucketEntry) {
     if (!cacheEnabled) {
       return false;
     }
-    boolean existed = removeFromRamCache(cacheKey);
-    BucketEntry be = backingMap.get(cacheKey);
-    if (be == null) {
-      if (existed) {
+    boolean existedInRamCache = removeFromRamCache(cacheKey);
+    if (bucketEntry == null) {
+      bucketEntry = backingMap.get(cacheKey);
+    }
+    final BucketEntry bucketEntryToUse = bucketEntry;
+
+    if (bucketEntryToUse == null) {
+      if (existedInRamCache) {
         cacheStats.evicted(0, cacheKey.isPrimary());
       }
-      return existed;
+      return existedInRamCache;
     } else {
-      return be.withWriteLock(offsetLock, be::markAsEvicted);
+      return bucketEntryToUse.withWriteLock(offsetLock, () -> {
+        if (backingMap.remove(cacheKey, bucketEntryToUse)) {
+          blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache);
+          return true;
+        }
+        return false;
+      });
     }
   }
 
-  private Recycler createRecycler(BlockCacheKey cacheKey) {
+  /**
+   * <pre>
+   * Create the {@link Recycler} for {@link BucketEntry#refCnt},which would be 
used as
+   * {@link RefCnt#recycler} of {@link HFileBlock#buf} returned from {@link 
BucketCache#getBlock}.
+   * NOTE: for {@link BucketCache#getBlock},the {@link RefCnt#recycler} of 
{@link HFileBlock#buf}
+   * from {@link BucketCache#backingMap} and {@link BucketCache#ramCache} are 
different:
+   * 1.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link 
BucketCache#backingMap},
+   *   it is the return value of current {@link BucketCache#createRecycler} 
method.
+   *
+   * 2.For {@link RefCnt#recycler} of {@link HFileBlock#buf} from {@link 
BucketCache#ramCache},
+   *   it is {@link ByteBuffAllocator#putbackBuffer}.
+   * </pre>
+   */
+  protected Recycler createRecycler(final BucketEntry bucketEntry) {
     return () -> {
-      if (!cacheEnabled) {
-        return;
-      }
-      boolean existed = removeFromRamCache(cacheKey);
-      BucketEntry be = backingMap.get(cacheKey);
-      if (be == null && existed) {
-        cacheStats.evicted(0, cacheKey.isPrimary());
-      } else if (be != null) {
-        be.withWriteLock(offsetLock, () -> {
-          if (backingMap.remove(cacheKey, be)) {
-            blockEvicted(cacheKey, be, !existed);
-            cacheStats.evicted(be.getCachedTime(), cacheKey.isPrimary());
-          }
-          return null;
-        });
-      }
+      freeBucketEntry(bucketEntry);
+      return;
     };
   }
 
-  private boolean removeFromRamCache(BlockCacheKey cacheKey) {
+  /**
+   * NOTE: This method is only for test.
+   */
+  public boolean evictBlockIfNoRpcReferenced(BlockCacheKey blockCacheKey) {
+    BucketEntry bucketEntry = backingMap.get(blockCacheKey);
+    if (bucketEntry == null) {
+      return false;
+    }
+    return evictBucketEntryIfNoRpcReferenced(blockCacheKey, bucketEntry);
+  }
+
+  /**
+   * Evict {@link BlockCacheKey} and its corresponding {@link BucketEntry} 
only if
+   * {@link BucketEntry#isRpcRef} is false. <br/>
+   * NOTE:When evict from {@link BucketCache#backingMap},only the matched 
{@link BlockCacheKey} and
+   * {@link BucketEntry} could be removed.
+   * @param blockCacheKey {@link BlockCacheKey} to evict.
+   * @param bucketEntry {@link BucketEntry} matched {@link BlockCacheKey} to 
evict.
+   * @return true to indicate whether we've evicted successfully or not.
+   */
+  boolean evictBucketEntryIfNoRpcReferenced(BlockCacheKey blockCacheKey, 
BucketEntry bucketEntry) {
+    if (!bucketEntry.isRpcRef()) {
+      return doEvictBlock(blockCacheKey, bucketEntry);
+    }
+    return false;
+  }
+
+  protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
     return ramCache.remove(cacheKey, re -> {
       if (re != null) {
         this.blockNumber.decrement();
@@ -707,7 +772,7 @@ public class BucketCache implements BlockCache, HeapSize {
           bucketAllocator.getLeastFilledBuckets(inUseBuckets, 
completelyFreeBucketsNeeded);
       for (Map.Entry<BlockCacheKey, BucketEntry> entry : 
backingMap.entrySet()) {
         if 
(candidateBuckets.contains(bucketAllocator.getBucketIndex(entry.getValue().offset())))
 {
-          entry.getValue().withWriteLock(offsetLock, 
entry.getValue()::markStaleAsEvicted);
+          evictBucketEntryIfNoRpcReferenced(entry.getKey(), entry.getValue());
         }
       }
     }
@@ -896,134 +961,134 @@ public class BucketCache implements BlockCache, 
HeapSize {
       }
       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
     }
+  }
 
-    /**
-     * Put the new bucket entry into backingMap. Notice that we are allowed to 
replace the existing
-     * cache with a new block for the same cache key. there's a corner case: 
one thread cache a
-     * block in ramCache, copy to io-engine and add a bucket entry to 
backingMap. Caching another
-     * new block with the same cache key do the same thing for the same cache 
key, so if not evict
-     * the previous bucket entry, then memory leak happen because the previous 
bucketEntry is gone
-     * but the bucketAllocator do not free its memory.
-     * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache 
blockCache,BlockCacheKey
-     *      cacheKey, Cacheable newBlock)
-     * @param key Block cache key
-     * @param bucketEntry Bucket entry to put into backingMap.
-     */
-    private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) 
{
-      BucketEntry previousEntry = backingMap.put(key, bucketEntry);
-      if (previousEntry != null && previousEntry != bucketEntry) {
-        previousEntry.withWriteLock(offsetLock, () -> {
-          blockEvicted(key, previousEntry, false);
-          return null;
-        });
-      }
+  /**
+   * Put the new bucket entry into backingMap. Notice that we are allowed to 
replace the existing
+   * cache with a new block for the same cache key. there's a corner case: one 
thread cache a block
+   * in ramCache, copy to io-engine and add a bucket entry to backingMap. 
Caching another new block
+   * with the same cache key do the same thing for the same cache key, so if 
not evict the previous
+   * bucket entry, then memory leak happen because the previous bucketEntry is 
gone but the
+   * bucketAllocator do not free its memory.
+   * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache 
blockCache,BlockCacheKey
+   *      cacheKey, Cacheable newBlock)
+   * @param key Block cache key
+   * @param bucketEntry Bucket entry to put into backingMap.
+   */
+  protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) 
{
+    BucketEntry previousEntry = backingMap.put(key, bucketEntry);
+    if (previousEntry != null && previousEntry != bucketEntry) {
+      previousEntry.withWriteLock(offsetLock, () -> {
+        blockEvicted(key, previousEntry, false);
+        return null;
+      });
     }
+  }
 
-    /**
-     * Flush the entries in ramCache to IOEngine and add bucket entry to 
backingMap.
-     * Process all that are passed in even if failure being sure to remove 
from ramCache else we'll
-     * never undo the references and we'll OOME.
-     * @param entries Presumes list passed in here will be processed by this 
invocation only. No
-     *   interference expected.
-     * @throws InterruptedException
-     */
-    void doDrain(final List<RAMQueueEntry> entries) throws 
InterruptedException {
-      if (entries.isEmpty()) {
-        return;
-      }
-      // This method is a little hard to follow. We run through the passed in 
entries and for each
-      // successful add, we add a non-null BucketEntry to the below 
bucketEntries.  Later we must
-      // do cleanup making sure we've cleared ramCache of all entries 
regardless of whether we
-      // successfully added the item to the bucketcache; if we don't do the 
cleanup, we'll OOME by
-      // filling ramCache.  We do the clean up by again running through the 
passed in entries
-      // doing extra work when we find a non-null bucketEntries corresponding 
entry.
-      final int size = entries.size();
-      BucketEntry[] bucketEntries = new BucketEntry[size];
-      // Index updated inside loop if success or if we can't succeed. We retry 
if cache is full
-      // when we go to add an entry by going around the loop again without 
upping the index.
-      int index = 0;
-      while (cacheEnabled && index < size) {
-        RAMQueueEntry re = null;
-        try {
-          re = entries.get(index);
-          if (re == null) {
-            LOG.warn("Couldn't get entry or changed on us; who else is messing 
with it?");
-            index++;
-            continue;
-          }
-          BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, 
realCacheSize);
-          // Successfully added. Up index and add bucketEntry. Clear io 
exceptions.
-          bucketEntries[index] = bucketEntry;
-          if (ioErrorStartTime > 0) {
-            ioErrorStartTime = -1;
-          }
-          index++;
-        } catch (BucketAllocatorException fle) {
-          LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) 
+ "; " + fle);
-          // Presume can't add. Too big? Move index on. Entry will be cleared 
from ramCache below.
-          bucketEntries[index] = null;
+  /**
+   * Flush the entries in ramCache to IOEngine and add bucket entry to 
backingMap. Process all that
+   * are passed in even if failure being sure to remove from ramCache else 
we'll never undo the
+   * references and we'll OOME.
+   * @param entries Presumes list passed in here will be processed by this 
invocation only. No
+   *          interference expected.
+   */
+  void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
+    if (entries.isEmpty()) {
+      return;
+    }
+    // This method is a little hard to follow. We run through the passed in 
entries and for each
+    // successful add, we add a non-null BucketEntry to the below 
bucketEntries. Later we must
+    // do cleanup making sure we've cleared ramCache of all entries regardless 
of whether we
+    // successfully added the item to the bucketcache; if we don't do the 
cleanup, we'll OOME by
+    // filling ramCache. We do the clean up by again running through the 
passed in entries
+    // doing extra work when we find a non-null bucketEntries corresponding 
entry.
+    final int size = entries.size();
+    BucketEntry[] bucketEntries = new BucketEntry[size];
+    // Index updated inside loop if success or if we can't succeed. We retry 
if cache is full
+    // when we go to add an entry by going around the loop again without 
upping the index.
+    int index = 0;
+    while (cacheEnabled && index < size) {
+      RAMQueueEntry re = null;
+      try {
+        re = entries.get(index);
+        if (re == null) {
+          LOG.warn("Couldn't get entry or changed on us; who else is messing 
with it?");
           index++;
-        } catch (CacheFullException cfe) {
-          // Cache full when we tried to add. Try freeing space and then 
retrying (don't up index)
-          if (!freeInProgress) {
-            freeSpace("Full!");
-          } else {
-            Thread.sleep(50);
-          }
-        } catch (IOException ioex) {
-          // Hopefully transient. Retry. checkIOErrorIsTolerated disables 
cache if problem.
-          LOG.error("Failed writing to bucket cache", ioex);
-          checkIOErrorIsTolerated();
+          continue;
+        }
+        BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, 
realCacheSize,
+          (entry) -> createRecycler(entry));
+        // Successfully added. Up index and add bucketEntry. Clear io 
exceptions.
+        bucketEntries[index] = bucketEntry;
+        if (ioErrorStartTime > 0) {
+          ioErrorStartTime = -1;
+        }
+        index++;
+      } catch (BucketAllocatorException fle) {
+        LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + 
"; " + fle);
+        // Presume can't add. Too big? Move index on. Entry will be cleared 
from ramCache below.
+        bucketEntries[index] = null;
+        index++;
+      } catch (CacheFullException cfe) {
+        // Cache full when we tried to add. Try freeing space and then 
retrying (don't up index)
+        if (!freeInProgress) {
+          freeSpace("Full!");
+        } else {
+          Thread.sleep(50);
         }
-      }
-
-      // Make sure data pages are written on media before we update maps.
-      try {
-        ioEngine.sync();
       } catch (IOException ioex) {
-        LOG.error("Failed syncing IO engine", ioex);
+        // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache 
if problem.
+        LOG.error("Failed writing to bucket cache", ioex);
         checkIOErrorIsTolerated();
-        // Since we failed sync, free the blocks in bucket allocator
-        for (int i = 0; i < entries.size(); ++i) {
-          if (bucketEntries[i] != null) {
-            bucketAllocator.freeBlock(bucketEntries[i].offset());
-            bucketEntries[i] = null;
-          }
-        }
       }
+    }
 
-      // Now add to backingMap if successfully added to bucket cache.  Remove 
from ramCache if
-      // success or error.
-      for (int i = 0; i < size; ++i) {
-        BlockCacheKey key = entries.get(i).getKey();
-        // Only add if non-null entry.
+    // Make sure data pages are written on media before we update maps.
+    try {
+      ioEngine.sync();
+    } catch (IOException ioex) {
+      LOG.error("Failed syncing IO engine", ioex);
+      checkIOErrorIsTolerated();
+      // Since we failed sync, free the blocks in bucket allocator
+      for (int i = 0; i < entries.size(); ++i) {
         if (bucketEntries[i] != null) {
-          putIntoBackingMap(key, bucketEntries[i]);
+          bucketAllocator.freeBlock(bucketEntries[i].offset());
+          bucketEntries[i] = null;
         }
-        // Always remove from ramCache even if we failed adding it to the 
block cache above.
-        boolean existed = ramCache.remove(key, re -> {
-          if (re != null) {
-            heapSize.add(-1 * re.getData().heapSize());
+      }
+    }
+
+    // Now add to backingMap if successfully added to bucket cache. Remove 
from ramCache if
+    // success or error.
+    for (int i = 0; i < size; ++i) {
+      BlockCacheKey key = entries.get(i).getKey();
+      // Only add if non-null entry.
+      if (bucketEntries[i] != null) {
+        putIntoBackingMap(key, bucketEntries[i]);
+      }
+      // Always remove from ramCache even if we failed adding it to the block 
cache above.
+      boolean existed = ramCache.remove(key, re -> {
+        if (re != null) {
+          heapSize.add(-1 * re.getData().heapSize());
+        }
+      });
+      if (!existed && bucketEntries[i] != null) {
+        // Block should have already been evicted. Remove it and free space.
+        final BucketEntry bucketEntry = bucketEntries[i];
+        bucketEntry.withWriteLock(offsetLock, () -> {
+          if (backingMap.remove(key, bucketEntry)) {
+            blockEvicted(key, bucketEntry, false);
           }
+          return null;
         });
-        if (!existed && bucketEntries[i] != null) {
-          // Block should have already been evicted. Remove it and free space.
-          final BucketEntry bucketEntry = bucketEntries[i];
-          bucketEntry.withWriteLock(offsetLock, () -> {
-            if (backingMap.remove(key, bucketEntry)) {
-              blockEvicted(key, bucketEntry, false);
-            }
-            return null;
-          });
-        }
       }
+    }
 
-      long used = bucketAllocator.getUsedSize();
-      if (used > acceptableSize()) {
-        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
-      }
-      return;
+    long used = bucketAllocator.getUsedSize();
+    if (used > acceptableSize()) {
+      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
     }
+    return;
   }
 
   /**
@@ -1313,8 +1378,9 @@ public class BucketCache implements BlockCache, HeapSize {
       // TODO avoid a cycling siutation. We find no block which is not in use 
and so no way to free
       // What to do then? Caching attempt fail? Need some changes in 
cacheBlock API?
       while ((entry = queue.pollLast()) != null) {
+        BlockCacheKey blockCacheKey = entry.getKey();
         BucketEntry be = entry.getValue();
-        if (be.withWriteLock(offsetLock, be::markStaleAsEvicted)) {
+        if (evictBucketEntryIfNoRpcReferenced(blockCacheKey, be)) {
           freedBytes += be.getLength();
         }
         if (freedBytes >= toFree) {
@@ -1341,15 +1407,12 @@ public class BucketCache implements BlockCache, 
HeapSize {
     private final Cacheable data;
     private long accessCounter;
     private boolean inMemory;
-    private final Recycler recycler;
 
-    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 
boolean inMemory,
-        Recycler recycler) {
+    RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 
boolean inMemory) {
       this.key = bck;
       this.data = data;
       this.accessCounter = accessCounter;
       this.inMemory = inMemory;
-      this.recycler = recycler;
     }
 
     public Cacheable getData() {
@@ -1372,7 +1435,8 @@ public class BucketCache implements BlockCache, HeapSize {
     }
 
     public BucketEntry writeToCache(final IOEngine ioEngine, final 
BucketAllocator alloc,
-        final LongAdder realCacheSize) throws IOException {
+        final LongAdder realCacheSize, Function<BucketEntry, Recycler> 
createRecycler)
+        throws IOException {
       int len = data.getSerializedLength();
       // This cacheable thing can't be serialized
       if (len == 0) {
@@ -1382,7 +1446,7 @@ public class BucketCache implements BlockCache, HeapSize {
       boolean succ = false;
       BucketEntry bucketEntry = null;
       try {
-        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, 
RefCnt.create(recycler),
+        bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory, 
createRecycler,
             getByteBuffAllocator());
         bucketEntry.setDeserializerReference(data.getDeserializer());
         if (data instanceof HFileBlock) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index 2dd7775..ca79f69 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -25,8 +25,10 @@ import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@@ -88,16 +90,21 @@ class BucketEntry implements HBaseReferenceCounted {
   private final long cachedTime = System.nanoTime();
 
   BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
-    this(offset, length, accessCounter, inMemory, RefCnt.create(), 
ByteBuffAllocator.HEAP);
+    this(offset, length, accessCounter, inMemory, null, 
ByteBuffAllocator.HEAP);
   }
 
-  BucketEntry(long offset, int length, long accessCounter, boolean inMemory, 
RefCnt refCnt,
+  BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
+      Function<BucketEntry, Recycler> createRecycler,
       ByteBuffAllocator allocator) {
     setOffset(offset);
     this.length = length;
     this.accessCounter = accessCounter;
     this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
-    this.refCnt = refCnt;
+    if (createRecycler == null) {
+      this.refCnt = RefCnt.create();
+    } else {
+      this.refCnt = RefCnt.create(createRecycler.apply(this));
+    }
     this.markedAsEvicted = new AtomicBoolean(false);
     this.allocator = allocator;
   }
@@ -166,19 +173,6 @@ class BucketEntry implements HBaseReferenceCounted {
   }
 
   /**
-   * Mark as evicted only when NO RPC references. Mainly used for eviction 
when cache size exceed
-   * the max acceptable size.
-   * @return true if we deallocate this entry successfully.
-   */
-  boolean markStaleAsEvicted() {
-    if (!markedAsEvicted.get() && this.refCnt() == 1) {
-      // The only reference was coming from backingMap, now release the stale 
entry.
-      return this.markAsEvicted();
-    }
-    return false;
-  }
-
-  /**
    * Check whether have some RPC patch referring this block. There're two 
case: <br>
    * 1. If current refCnt is greater than 1, there must be at least one 
referring RPC path; <br>
    * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the 
it means backingMap has
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index a478fcb..a5756f9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
+import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -372,7 +375,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
               CachedBlock next = iterator.next();
               BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), 
next.getOffset());
               cacheList.add(cacheKey);
-              cache.evictBlock(cacheKey);
+              /**
+               * There is only one Block referenced by rpc,here we evict 
blocks which have no rpc
+               * referenced.
+               */
+              evictBlock(cache, cacheKey);
             }
             try {
               Thread.sleep(1);
@@ -437,4 +444,20 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
       table.close();
     }
   }
+
+  /**
+   * For {@link BucketCache},we only evict Block if there is no rpc referenced.
+   */
+  private void evictBlock(BlockCache blockCache, BlockCacheKey blockCacheKey) {
+    assertTrue(blockCache instanceof CombinedBlockCache);
+    BlockCache[] blockCaches = blockCache.getBlockCaches();
+    for (BlockCache currentBlockCache : blockCaches) {
+      if (currentBlockCache instanceof BucketCache) {
+        ((BucketCache) 
currentBlockCache).evictBlockIfNoRpcReferenced(blockCacheKey);
+      } else {
+        currentBlockCache.evictBlock(blockCacheKey);
+      }
+    }
+
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 5f2d136..1b10fbd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -249,8 +249,26 @@ public class TestBucketCache {
     assertEquals(1, cache.getBlockCount());
     lock.writeLock().unlock();
     evictThread.join();
-    assertEquals(0, cache.getBlockCount());
-    assertEquals(cache.getCurrentSize(), 0L);
+    /**
+     * <pre>
+     * The asserts here before HBASE-21957 are:
+     * assertEquals(1L, cache.getBlockCount());
+     * assertTrue(cache.getCurrentSize() > 0L);
+     * assertTrue("We should have a block!", cache.iterator().hasNext());
+     *
+     * The asserts here after HBASE-21957 are:
+     * assertEquals(0, cache.getBlockCount());
+     * assertEquals(cache.getCurrentSize(), 0L);
+     *
+     * I think the asserts before HBASE-21957 is more reasonable,because
+     * {@link BucketCache#evictBlock} should only evict the {@link BucketEntry}
+     * it had seen, and newly added Block after the {@link BucketEntry}
+     * it had seen should not be evicted.
+     * </pre>
+     */
+    assertEquals(1L, cache.getBlockCount());
+    assertTrue(cache.getCurrentSize() > 0L);
+    assertTrue("We should have a block!", cache.iterator().hasNext());
   }
 
   @Test
@@ -503,8 +521,8 @@ public class TestBucketCache {
         HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
     HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, 
ByteBuff.wrap(buf),
         HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
-    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, 
ByteBuffAllocator.NONE);
-    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, 
ByteBuffAllocator.NONE);
+    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
+    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
 
     assertFalse(cache.containsKey(key1));
     assertNull(cache.putIfAbsent(key1, re1));
@@ -551,11 +569,11 @@ public class TestBucketCache {
     BucketAllocator allocator = new BucketAllocator(availableSpace, null);
 
     BlockCacheKey key = new BlockCacheKey("dummy", 1L);
-    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, 
ByteBuffAllocator.NONE);
+    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
 
     Assert.assertEquals(0, allocator.getUsedSize());
     try {
-      re.writeToCache(ioEngine, allocator, null);
+      re.writeToCache(ioEngine, allocator, null, null);
       Assert.fail();
     } catch (Exception e) {
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index eead815..fd083fd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -25,11 +25,17 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
@@ -37,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.WriterThread;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.ClassRule;
@@ -65,6 +72,18 @@ public class TestBucketCacheRefCnt {
         queueSize, PERSISTENCE_PATH);
   }
 
+  private static MyBucketCache createMyBucketCache(int writerSize, int 
queueSize)
+      throws IOException {
+    return new MyBucketCache(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, 
BLOCK_SIZE_ARRAY, writerSize,
+        queueSize, PERSISTENCE_PATH);
+  }
+
+  private static MyBucketCache2 createMyBucketCache2(int writerSize, int 
queueSize)
+      throws IOException {
+    return new MyBucketCache2(IO_ENGINE, CAPACITY_SIZE, BLOCK_SIZE, 
BLOCK_SIZE_ARRAY, writerSize,
+        queueSize, PERSISTENCE_PATH);
+  }
+
   private static HFileBlock createBlock(int offset, int size) {
     return createBlock(offset, size, ByteBuffAllocator.HEAP);
   }
@@ -133,8 +152,10 @@ public class TestBucketCacheRefCnt {
     }
   }
 
-  private void waitUntilFlushedToCache(BlockCacheKey key) throws 
InterruptedException {
-    while (!cache.backingMap.containsKey(key) || 
cache.ramCache.containsKey(key)) {
+  private static void waitUntilFlushedToCache(BucketCache bucketCache, 
BlockCacheKey blockCacheKey)
+      throws InterruptedException {
+    while (!bucketCache.backingMap.containsKey(blockCacheKey)
+        || bucketCache.ramCache.containsKey(blockCacheKey)) {
       Thread.sleep(100);
     }
     Thread.sleep(1000);
@@ -148,7 +169,7 @@ public class TestBucketCacheRefCnt {
       HFileBlock blk = createBlock(200, 1020, alloc);
       BlockCacheKey key = createKey("testHFile-00", 200);
       cache.cacheBlock(key, blk);
-      waitUntilFlushedToCache(key);
+      waitUntilFlushedToCache(cache, key);
       assertEquals(1, blk.refCnt());
 
       Cacheable block = cache.getBlock(key, false, false, false);
@@ -180,17 +201,18 @@ public class TestBucketCacheRefCnt {
       assertFalse(block.release());
       assertEquals(1, block.refCnt());
 
-      newBlock = cache.getBlock(key, false, false, false);
-      assertEquals(2, block.refCnt());
-      assertEquals(2, newBlock.refCnt());
+      /**
+       * The key was evicted from {@link BucketCache#backingMap} and {@link 
BucketCache#ramCache},
+       * so {@link BucketCache#getBlock} return null.
+       */
+      Cacheable newestBlock = cache.getBlock(key, false, false, false);
+      assertNull(newestBlock);
+      assertEquals(1, block.refCnt());
       assertTrue(((HFileBlock) newBlock).getByteBuffAllocator() == alloc);
 
       // Release the block
-      assertFalse(block.release());
-      assertEquals(1, block.refCnt());
-
-      // Release the newBlock;
-      assertTrue(newBlock.release());
+      assertTrue(block.release());
+      assertEquals(0, block.refCnt());
       assertEquals(0, newBlock.refCnt());
     } finally {
       cache.shutdown();
@@ -247,7 +269,7 @@ public class TestBucketCacheRefCnt {
       HFileBlock blk = createBlock(200, 1020);
       BlockCacheKey key = createKey("testMarkStaleAsEvicted", 200);
       cache.cacheBlock(key, blk);
-      waitUntilFlushedToCache(key);
+      waitUntilFlushedToCache(cache, key);
       assertEquals(1, blk.refCnt());
       assertNotNull(cache.backingMap.get(key));
       assertEquals(1, cache.backingMap.get(key).refCnt());
@@ -260,7 +282,7 @@ public class TestBucketCacheRefCnt {
       assertEquals(2, be1.refCnt());
 
       // We've some RPC reference, so it won't have any effect.
-      assertFalse(be1.markStaleAsEvicted());
+      assertFalse(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
       assertEquals(2, block1.refCnt());
       assertEquals(2, cache.backingMap.get(key).refCnt());
 
@@ -270,7 +292,7 @@ public class TestBucketCacheRefCnt {
       assertEquals(1, cache.backingMap.get(key).refCnt());
 
       // Mark the stale as evicted again, it'll do the de-allocation.
-      assertTrue(be1.markStaleAsEvicted());
+      assertTrue(cache.evictBucketEntryIfNoRpcReferenced(key, be1));
       assertEquals(0, block1.refCnt());
       assertNull(cache.backingMap.get(key));
       assertEquals(0, cache.size());
@@ -278,4 +300,445 @@ public class TestBucketCacheRefCnt {
       cache.shutdown();
     }
   }
+
+  /**
+   * <pre>
+   * This test is for HBASE-26281,
+   * test two threads for replacing Block and getting Block execute 
concurrently.
+   * The threads sequence is:
+   * 1. Block1 was cached successfully,the {@link RefCnt} of Block1 is 1.
+   * 2. Thread1 caching the same {@link BlockCacheKey} with Block2 satisfied
+   *    {@link BlockCacheUtil#shouldReplaceExistingCacheBlock}, so Block2 would
+   *    replace Block1, but thread1 stopping before {@link 
BucketCache#cacheBlockWithWaitInternal}
+   * 3. Thread2 invoking {@link BucketCache#getBlock} with the same {@link 
BlockCacheKey},
+   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
+   * 4. Thread1 continues caching Block2, in {@link 
BucketCache.WriterThread#putIntoBackingMap},
+   *    the old Block1 is freed directly which {@link RefCnt} is 2, but the 
Block1 is still used
+   *    by Thread2 and the content of Block1 would be overwritten after it is 
freed, which may
+   *    cause a serious error.
+   * </pre>
+   * @throws Exception
+   */
+  @Test
+  public void testReplacingBlockAndGettingBlockConcurrently() throws Exception 
{
+    ByteBuffAllocator byteBuffAllocator =
+        ByteBuffAllocator.create(HBaseConfiguration.create(), true);
+    final MyBucketCache myBucketCache = createMyBucketCache(1, 1000);
+    try {
+      HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
+      final BlockCacheKey blockCacheKey = createKey("testTwoThreadConcurrent", 
200);
+      myBucketCache.cacheBlock(blockCacheKey, hfileBlock);
+      waitUntilFlushedToCache(myBucketCache, blockCacheKey);
+      assertEquals(1, hfileBlock.refCnt());
+
+      assertTrue(!myBucketCache.ramCache.containsKey(blockCacheKey));
+      final AtomicReference<Throwable> exceptionRef = new 
AtomicReference<Throwable>();
+      Thread cacheBlockThread = new Thread(() -> {
+        try {
+          HFileBlock newHFileBlock = createBlock(200, 1020, byteBuffAllocator);
+          myBucketCache.cacheBlock(blockCacheKey, newHFileBlock);
+          waitUntilFlushedToCache(myBucketCache, blockCacheKey);
+
+        } catch (Throwable exception) {
+          exceptionRef.set(exception);
+        }
+      });
+      cacheBlockThread.setName(MyBucketCache.CACHE_BLOCK_THREAD_NAME);
+      cacheBlockThread.start();
+
+      String oldThreadName = Thread.currentThread().getName();
+      HFileBlock gotHFileBlock = null;
+      try {
+
+        Thread.currentThread().setName(MyBucketCache.GET_BLOCK_THREAD_NAME);
+
+        gotHFileBlock = (HFileBlock) (myBucketCache.getBlock(blockCacheKey, 
false, false, false));
+        assertTrue(gotHFileBlock.equals(hfileBlock));
+        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
+        assertEquals(2, gotHFileBlock.refCnt());
+        /**
+         * Release the second cyclicBarrier.await in
+         * {@link MyBucketCache#cacheBlockWithWaitInternal}
+         */
+        myBucketCache.cyclicBarrier.await();
+
+      } finally {
+        Thread.currentThread().setName(oldThreadName);
+      }
+
+      cacheBlockThread.join();
+      assertTrue(exceptionRef.get() == null);
+      assertEquals(1, gotHFileBlock.refCnt());
+      assertTrue(gotHFileBlock.equals(hfileBlock));
+      assertTrue(myBucketCache.overwiteByteBuff == null);
+      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 0);
+
+      gotHFileBlock.release();
+      assertEquals(0, gotHFileBlock.refCnt());
+      assertTrue(myBucketCache.overwiteByteBuff != null);
+      assertTrue(myBucketCache.freeBucketEntryCounter.get() == 1);
+      assertTrue(myBucketCache.replaceCounter.get() == 1);
+      assertTrue(myBucketCache.blockEvictCounter.get() == 1);
+    } finally {
+      myBucketCache.shutdown();
+    }
+
+  }
+
+  /**
+   * <pre>
+   * This test also is for HBASE-26281,
+   * test three threads for evicting Block,caching Block and getting Block
+   * execute concurrently.
+   * 1. Thread1 caching Block1, stopping after {@link 
BucketCache.WriterThread#putIntoBackingMap},
+   *    the {@link RefCnt} of Block1 is 1.
+   * 2. Thread2 invoking {@link BucketCache#evictBlock} with the same {@link 
BlockCacheKey},
+   *    but stopping after {@link BucketCache#removeFromRamCache}.
+   * 3. Thread3 invoking {@link BucketCache#getBlock} with the same {@link 
BlockCacheKey},
+   *    which returned Block1, the {@link RefCnt} of Block1 is 2.
+   * 4. Thread1 continues caching block1,but finding that {@link 
BucketCache.RAMCache#remove}
+   *    returning false, so invoking {@link BucketCache#blockEvicted} to free 
the the Block1
+   *    directly which {@link RefCnt} is 2 and the Block1 is still used by 
Thread3.
+   * </pre>
+   */
+  @Test
+  public void testEvictingBlockCachingBlockGettingBlockConcurrently() throws 
Exception {
+    ByteBuffAllocator byteBuffAllocator =
+        ByteBuffAllocator.create(HBaseConfiguration.create(), true);
+    final MyBucketCache2 myBucketCache2 = createMyBucketCache2(1, 1000);
+    try {
+      final HFileBlock hfileBlock = createBlock(200, 1020, byteBuffAllocator);
+      final BlockCacheKey blockCacheKey = 
createKey("testThreeThreadConcurrent", 200);
+      final AtomicReference<Throwable> cacheBlockThreadExceptionRef =
+          new AtomicReference<Throwable>();
+      Thread cacheBlockThread = new Thread(() -> {
+        try {
+          myBucketCache2.cacheBlock(blockCacheKey, hfileBlock);
+          /**
+           * Wait for Caching Block completed.
+           */
+          myBucketCache2.writeThreadDoneCyclicBarrier.await();
+        } catch (Throwable exception) {
+          cacheBlockThreadExceptionRef.set(exception);
+        }
+      });
+      cacheBlockThread.setName(MyBucketCache2.CACHE_BLOCK_THREAD_NAME);
+      cacheBlockThread.start();
+
+      final AtomicReference<Throwable> evictBlockThreadExceptionRef =
+          new AtomicReference<Throwable>();
+      Thread evictBlockThread = new Thread(() -> {
+        try {
+          myBucketCache2.evictBlock(blockCacheKey);
+        } catch (Throwable exception) {
+          evictBlockThreadExceptionRef.set(exception);
+        }
+      });
+      evictBlockThread.setName(MyBucketCache2.EVICT_BLOCK_THREAD_NAME);
+      evictBlockThread.start();
+
+      String oldThreadName = Thread.currentThread().getName();
+      HFileBlock gotHFileBlock = null;
+      try {
+        Thread.currentThread().setName(MyBucketCache2.GET_BLOCK_THREAD_NAME);
+        gotHFileBlock = (HFileBlock) (myBucketCache2.getBlock(blockCacheKey, 
false, false, false));
+        assertTrue(gotHFileBlock.equals(hfileBlock));
+        assertTrue(gotHFileBlock.getByteBuffAllocator() == byteBuffAllocator);
+        assertEquals(2, gotHFileBlock.refCnt());
+        try {
+          /**
+           * Release the second cyclicBarrier.await in {@link 
MyBucketCache2#putIntoBackingMap} for
+           * {@link BucketCache.WriterThread},getBlock completed,{@link 
BucketCache.WriterThread}
+           * could continue.
+           */
+          myBucketCache2.putCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+
+      } finally {
+        Thread.currentThread().setName(oldThreadName);
+      }
+
+      cacheBlockThread.join();
+      evictBlockThread.join();
+      assertTrue(cacheBlockThreadExceptionRef.get() == null);
+      assertTrue(evictBlockThreadExceptionRef.get() == null);
+
+      assertTrue(gotHFileBlock.equals(hfileBlock));
+      assertEquals(1, gotHFileBlock.refCnt());
+      assertTrue(myBucketCache2.overwiteByteBuff == null);
+      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 0);
+
+      gotHFileBlock.release();
+      assertEquals(0, gotHFileBlock.refCnt());
+      assertTrue(myBucketCache2.overwiteByteBuff != null);
+      assertTrue(myBucketCache2.freeBucketEntryCounter.get() == 1);
+      assertTrue(myBucketCache2.blockEvictCounter.get() == 1);
+    } finally {
+      myBucketCache2.shutdown();
+    }
+
+  }
+
+  static class MyBucketCache extends BucketCache {
+    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
+    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
+
+    private final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
+    private final AtomicInteger replaceCounter = new AtomicInteger(0);
+    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
+    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
+    private ByteBuff overwiteByteBuff = null;
+
+    public MyBucketCache(String ioEngineName, long capacity, int blockSize, 
int[] bucketSizes,
+        int writerThreadNum, int writerQLen, String persistencePath) throws 
IOException {
+      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, 
writerQLen,
+          persistencePath);
+    }
+
+    /**
+     * Simulate the Block could be replaced.
+     */
+    @Override
+    protected boolean shouldReplaceExistingCacheBlock(BlockCacheKey cacheKey, 
Cacheable newBlock) {
+      replaceCounter.incrementAndGet();
+      return true;
+    }
+
+    @Override
+    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean 
repeat,
+        boolean updateCacheMetrics) {
+      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
+        /**
+         * Wait the first cyclicBarrier.await() in {@link 
MyBucketCache#cacheBlockWithWaitInternal},
+         * so the {@link BucketCache#getBlock} is executed after the {@link 
BucketEntry#isRpcRef}
+         * checking.
+         */
+        try {
+          cyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      Cacheable result = super.getBlock(key, caching, repeat, 
updateCacheMetrics);
+      return result;
+    }
+
+    @Override
+    protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, 
Cacheable cachedItem,
+        boolean inMemory, boolean wait) {
+      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
+        /**
+         * Wait the cyclicBarrier.await() in {@link MyBucketCache#getBlock}
+         */
+        try {
+          cyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      if (Thread.currentThread().getName().equals(CACHE_BLOCK_THREAD_NAME)) {
+        /**
+         * Wait the cyclicBarrier.await() in
+         * {@link 
TestBucketCacheRefCnt#testReplacingBlockAndGettingBlockConcurrently} for
+         * {@link MyBucketCache#getBlock} and Assert completed.
+         */
+        try {
+          cyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      super.cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
+    }
+
+    @Override
+    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
+        boolean decrementBlockNumber) {
+      blockEvictCounter.incrementAndGet();
+      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
+    }
+
+    /**
+     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would 
be overwrite after the
+     * {@link BucketEntry} is freed.
+     */
+    @Override
+    void freeBucketEntry(BucketEntry bucketEntry) {
+      freeBucketEntryCounter.incrementAndGet();
+      super.freeBucketEntry(bucketEntry);
+      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
+      try {
+        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  static class MyBucketCache2 extends BucketCache {
+    private static final String GET_BLOCK_THREAD_NAME = "_getBlockThread";
+    private static final String CACHE_BLOCK_THREAD_NAME = "_cacheBlockThread";
+    private static final String EVICT_BLOCK_THREAD_NAME = "_evictBlockThread";
+
+    private final CyclicBarrier getCyclicBarrier = new CyclicBarrier(2);
+    private final CyclicBarrier evictCyclicBarrier = new CyclicBarrier(2);
+    private final CyclicBarrier putCyclicBarrier = new CyclicBarrier(2);
+    /**
+     * This is used for {@link BucketCache.WriterThread},{@link 
#CACHE_BLOCK_THREAD_NAME} and
+     * {@link #EVICT_BLOCK_THREAD_NAME},waiting for caching block completed.
+     */
+    private final CyclicBarrier writeThreadDoneCyclicBarrier = new 
CyclicBarrier(3);
+    private final AtomicInteger blockEvictCounter = new AtomicInteger(0);
+    private final AtomicInteger removeRamCounter = new AtomicInteger(0);
+    private final AtomicInteger freeBucketEntryCounter = new AtomicInteger(0);
+    private ByteBuff overwiteByteBuff = null;
+
+    public MyBucketCache2(String ioEngineName, long capacity, int blockSize, 
int[] bucketSizes,
+        int writerThreadNum, int writerQLen, String persistencePath) throws 
IOException {
+      super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, 
writerQLen,
+          persistencePath);
+    }
+
+    @Override
+    protected void putIntoBackingMap(BlockCacheKey key, BucketEntry 
bucketEntry) {
+      super.putIntoBackingMap(key, bucketEntry);
+      /**
+       * The {@link BucketCache.WriterThread} wait for 
evictCyclicBarrier.await before
+       * {@link MyBucketCache2#removeFromRamCache} for {@link 
#EVICT_BLOCK_THREAD_NAME}
+       */
+      try {
+        evictCyclicBarrier.await();
+      } catch (Throwable e) {
+        throw new RuntimeException(e);
+      }
+
+      /**
+       * Wait the cyclicBarrier.await() in
+       * {@link 
TestBucketCacheRefCnt#testEvictingBlockCachingBlockGettingBlockConcurrently} for
+       * {@link MyBucketCache#getBlock} and Assert completed.
+       */
+      try {
+        putCyclicBarrier.await();
+      } catch (Throwable e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
+      super.doDrain(entries);
+      if (entries.size() > 0) {
+        /**
+         * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
+         * {@link #EVICT_BLOCK_THREAD_NAME}.
+         */
+        try {
+          writeThreadDoneCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+    }
+
+    @Override
+    public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean 
repeat,
+        boolean updateCacheMetrics) {
+      if (Thread.currentThread().getName().equals(GET_BLOCK_THREAD_NAME)) {
+        /**
+         * Wait for second getCyclicBarrier.await in {@link 
MyBucketCache2#removeFromRamCache} after
+         * {@link BucketCache#removeFromRamCache}.
+         */
+        try {
+          getCyclicBarrier.await();
+        } catch (Throwable e) {
+          throw new RuntimeException(e);
+        }
+      }
+      Cacheable result = super.getBlock(key, caching, repeat, 
updateCacheMetrics);
+      return result;
+    }
+
+    @Override
+    protected boolean removeFromRamCache(BlockCacheKey cacheKey) {
+      boolean firstTime = false;
+      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
+        int count = this.removeRamCounter.incrementAndGet();
+        firstTime = (count == 1);
+        if (firstTime) {
+          /**
+           * The {@link #EVICT_BLOCK_THREAD_NAME} wait for 
evictCyclicBarrier.await after
+           * {@link BucketCache#putIntoBackingMap}.
+           */
+          try {
+            evictCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+      boolean result = super.removeFromRamCache(cacheKey);
+      if (Thread.currentThread().getName().equals(EVICT_BLOCK_THREAD_NAME)) {
+        if (firstTime) {
+          /**
+           * Wait for getCyclicBarrier.await before {@link 
BucketCache#getBlock}.
+           */
+          try {
+            getCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+          /**
+           * Wait for Caching Block completed, after Caching Block completed, 
evictBlock could
+           * continue.
+           */
+          try {
+            writeThreadDoneCyclicBarrier.await();
+          } catch (Throwable e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
+      return result;
+    }
+
+    @Override
+    void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry,
+        boolean decrementBlockNumber) {
+      /**
+       * This is only invoked by {@link BucketCache.WriterThread}. {@link 
MyMyBucketCache2} create
+       * only one {@link BucketCache.WriterThread}.
+       */
+      assertTrue(Thread.currentThread() == this.writerThreads[0]);
+
+      blockEvictCounter.incrementAndGet();
+      super.blockEvicted(cacheKey, bucketEntry, decrementBlockNumber);
+    }
+
+    /**
+     * Overwrite 0xff to the {@link BucketEntry} content to simulate it would 
be overwrite after the
+     * {@link BucketEntry} is freed.
+     */
+    @Override
+    void freeBucketEntry(BucketEntry bucketEntry) {
+      freeBucketEntryCounter.incrementAndGet();
+      super.freeBucketEntry(bucketEntry);
+      this.overwiteByteBuff = getOverwriteByteBuff(bucketEntry);
+      try {
+        this.ioEngine.write(this.overwiteByteBuff, bucketEntry.offset());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static ByteBuff getOverwriteByteBuff(BucketEntry bucketEntry) {
+    int byteSize = bucketEntry.getLength();
+    byte[] data = new byte[byteSize];
+    Arrays.fill(data, (byte) 0xff);
+    return ByteBuff.wrap(ByteBuffer.wrap(data));
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index b631cf9..0ba7dea 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -73,7 +73,6 @@ public class TestBucketWriterThread {
   /**
    * Set up variables and get BucketCache and WriterThread into state where 
tests can  manually
    * control the running of WriterThread and BucketCache is empty.
-   * @throws Exception
    */
   @Before
   public void setUp() throws Exception {
@@ -141,7 +140,7 @@ public class TestBucketWriterThread {
     RAMQueueEntry rqe = q.remove();
     RAMQueueEntry spiedRqe = Mockito.spy(rqe);
     Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
-      writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
+        writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
     this.q.add(spiedRqe);
     doDrainOfOneEntry(bc, wt, q);
     // Cache disabled when ioes w/o ever healing.
@@ -163,7 +162,7 @@ public class TestBucketWriterThread {
     BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
     Mockito.doThrow(cfe).
       doReturn(mockedBucketEntry).
-      when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any());
+        when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any());
     this.q.add(spiedRqe);
     doDrainOfOneEntry(bc, wt, q);
   }
@@ -172,7 +171,7 @@ public class TestBucketWriterThread {
       final BlockingQueue<RAMQueueEntry> q)
   throws InterruptedException {
     List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new 
ArrayList<>(1));
-    wt.doDrain(rqes);
+    bc.doDrain(rqes);
     assertTrue(q.isEmpty());
     assertTrue(bc.ramCache.isEmpty());
     assertEquals(0, bc.heapSize());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index d1b8f9a..97a5283 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Assert;
@@ -50,7 +49,7 @@ public class TestByteBufferIOEngine {
     private long off;
 
     MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
-      super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
+      super(offset & 0xFF00, length, 0, false, null, allocator);
       this.off = offset;
     }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index eb9dca9..4b0801f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -91,7 +91,7 @@ public class TestRAMCache {
     MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
         ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
         new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
-    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, 
ByteBuffAllocator.NONE);
+    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
 
     Assert.assertNull(cache.putIfAbsent(key, re));
     Assert.assertEquals(cache.putIfAbsent(key, re), re);

Reply via email to