This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new a3e0182cef3 HBASE-27365 Minimise block addition failures due to no 
space in bucket cache writers queue by introducing wait time
a3e0182cef3 is described below

commit a3e0182cef3ddadb2c5ac8c461f9f38cc8adec4f
Author: Rajeshbabu Chintaguntla <[email protected]>
AuthorDate: Wed Oct 5 03:32:49 2022 +0530

    HBASE-27365 Minimise block addition failures due to no space in bucket 
cache writers queue by introducing wait time
---
 .../apache/hadoop/hbase/io/hfile/BlockCache.java   | 13 ++++
 .../hadoop/hbase/io/hfile/CombinedBlockCache.java  |  8 ++-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |  6 +-
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  2 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 29 ++++++---
 .../hbase/io/hfile/bucket/TestBucketCache.java     | 69 +++++++++++++++++++---
 .../io/hfile/bucket/TestBucketCacheRefCnt.java     |  2 -
 hbase-server/src/test/resources/hbase-site.xml     |  5 ++
 8 files changed, 111 insertions(+), 23 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 52eaa30317a..1caa1b76f5f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
    */
   void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);
 
+  /**
+   * Add block to cache.
+   * @param cacheKey      The block's cache key.
+   * @param buf           The block contents wrapped in a ByteBuffer.
+   * @param inMemory      Whether block should be treated as in-memory
+   * @param waitWhenCache Whether to wait for the cache to be flushed mainly 
when BucketCache is
+   *                      configured.
+   */
+  default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean 
inMemory,
+    boolean waitWhenCache) {
+    cacheBlock(cacheKey, buf, inMemory);
+  }
+
   /**
    * Add block to cache (defaults to not in-memory).
    * @param cacheKey The block's cache key.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 358ee2057a2..9e02cff873e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -53,11 +53,17 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
 
   @Override
   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean 
inMemory) {
+    cacheBlock(cacheKey, buf, inMemory, false);
+  }
+
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean 
inMemory,
+    boolean waitWhenCache) {
     boolean metaBlock = buf.getBlockType().getCategory() != 
BlockType.BlockCategory.DATA;
     if (metaBlock) {
       l1Cache.cacheBlock(cacheKey, buf, inMemory);
     } else {
-      l2Cache.cacheBlock(cacheKey, buf, inMemory);
+      l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
     }
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 2cf1c3df677..5c2e42e6b8a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1328,7 +1328,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
           // Cache the block if necessary
           cacheConf.getBlockCache().ifPresent(cache -> {
             if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
-              cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
+              cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), 
cacheOnly);
             }
           });
 
@@ -1341,8 +1341,8 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
         // Cache the block if necessary
         cacheConf.getBlockCache().ifPresent(cache -> {
           if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
-            cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
-              cacheConf.isInMemory());
+            // Using the wait on cache during compaction and prefetching.
+            cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : 
unpacked, cacheOnly);
           }
         });
         if (unpacked != hfileBlock) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index af8851d0d91..bdedc3ceb90 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -541,7 +541,7 @@ public class HFileWriterImpl implements HFile.Writer {
       HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
       try {
         cache.cacheBlock(new BlockCacheKey(name, offset, true, 
cacheFormatBlock.getBlockType()),
-          cacheFormatBlock);
+          cacheFormatBlock, cacheConf.isInMemory(), true);
       } finally {
         // refCnt will auto increase when block add to Cache, see 
RAMCache#putIfAbsent
         cacheFormatBlock.release();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 73f2bc71c31..8d097141785 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -167,13 +167,6 @@ public class BucketCache implements BlockCache, HeapSize {
 
   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
 
-  /**
-   * Used in tests. If this flag is false and the cache speed is very fast, 
bucket cache will skip
-   * some blocks when caching. If the flag is true, we will wait until blocks 
are flushed to
-   * IOEngine.
-   */
-  boolean wait_when_cache = false;
-
   private final BucketCacheStats cacheStats = new BucketCacheStats();
 
   private final String persistencePath;
@@ -239,6 +232,10 @@ public class BucketCache implements BlockCache, HeapSize {
     "hbase.bucketcache.persistent.file.integrity.check.algorithm";
   private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
 
+  private static final String QUEUE_ADDITION_WAIT_TIME =
+    "hbase.bucketcache.queue.addition.waittime";
+  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
+  private long queueAdditionWaitTime;
   /**
    * Use {@link java.security.MessageDigest} class's encryption algorithms to 
check persistent file
    * integrity, default algorithm is MD5
@@ -273,6 +270,8 @@ public class BucketCache implements BlockCache, HeapSize {
     this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, 
DEFAULT_SINGLE_FACTOR);
     this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, 
DEFAULT_MULTI_FACTOR);
     this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, 
DEFAULT_MEMORY_FACTOR);
+    this.queueAdditionWaitTime =
+      conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
 
     sanityCheckConfigs();
 
@@ -415,7 +414,19 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   @Override
   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean 
inMemory) {
-    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
+    cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
+  }
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * @param cacheKey   block's cache key
+   * @param cachedItem block buffer
+   * @param inMemory   if block is in-memory
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean 
inMemory,
+    boolean waitWhenCache) {
+    cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && 
queueAdditionWaitTime > 0);
   }
 
   /**
@@ -471,7 +482,7 @@ public class BucketCache implements BlockCache, HeapSize {
     boolean successfulAddition = false;
     if (wait) {
       try {
-        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, 
TimeUnit.MILLISECONDS);
+        successfulAddition = bq.offer(re, queueAdditionWaitTime, 
TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 6f7d4e22e2b..4a53a0212ae 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -120,7 +121,6 @@ public class TestBucketCache {
       int writerThreads, int writerQLen, String persistencePath) throws 
IOException {
       super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, 
writerQLen,
         persistencePath);
-      super.wait_when_cache = true;
     }
 
     @Override
@@ -242,8 +242,8 @@ public class TestBucketCache {
   // BucketCache.cacheBlock is async, it first adds block to ramCache and 
writeQueue, then writer
   // threads will flush it to the bucket and put reference entry in backingMap.
   private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, 
BlockCacheKey cacheKey,
-    Cacheable block) throws InterruptedException {
-    cache.cacheBlock(cacheKey, block);
+    Cacheable block, boolean waitWhenCache) throws InterruptedException {
+    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
     waitUntilFlushedToBucket(cache, cacheKey);
   }
 
@@ -251,7 +251,7 @@ public class TestBucketCache {
   public void testMemoryLeak() throws Exception {
     final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
-      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
     long lockId = cache.backingMap.get(cacheKey).offset();
     ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
     lock.writeLock().lock();
@@ -266,7 +266,7 @@ public class TestBucketCache {
     cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true, 
true);
     assertEquals(0, cache.getBlockCount());
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
-      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
     assertEquals(1, cache.getBlockCount());
     lock.writeLock().unlock();
     evictThread.join();
@@ -312,7 +312,8 @@ public class TestBucketCache {
       bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
     }
     for (HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock(),
+        false);
     }
     usedSize = bucketCache.getAllocator().getUsedSize();
     assertNotEquals(0, usedSize);
@@ -691,7 +692,7 @@ public class TestBucketCache {
 
       for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
         cacheAndWaitUntilFlushedToBucket(bucketCache, 
hfileBlockPair.getBlockName(),
-          hfileBlockPair.getBlock());
+          hfileBlockPair.getBlock(), false);
       }
       usedByteSize = bucketCache.getAllocator().getUsedSize();
       assertNotEquals(0, usedByteSize);
@@ -716,4 +717,58 @@ public class TestBucketCache {
     }
   }
 
+  @Test
+  public void testBlockAdditionWaitWhenCache() throws Exception {
+    try {
+      final Path dataTestDir = createAndGetTestDir();
+
+      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
+      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
+
+      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+        constructedBlockSizes, 1, 1, persistencePath);
+      long usedByteSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedByteSize);
+
+      HFileBlockPair[] hfileBlockPairs =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
+      // Add blocks
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), 
hfileBlockPair.getBlock(), false,
+          true);
+      }
+
+      // Max wait for 10 seconds.
+      long timeout = 10000;
+      // Wait for blocks size to match the number of blocks.
+      while (bucketCache.backingMap.size() != 10) {
+        if (timeout <= 0) break;
+        Threads.sleep(100);
+        timeout = -100;
+      }
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        
assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
+      }
+      usedByteSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedByteSize);
+      // persist cache to file
+      bucketCache.shutdown();
+      assertTrue(new File(persistencePath).exists());
+
+      // restore cache from file
+      bucketCache = new BucketCache(ioEngineName, capacitySize, 
constructedBlockSize,
+        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+      assertFalse(new File(persistencePath).exists());
+      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
+
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
+        bucketCache.evictBlock(blockCacheKey);
+      }
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      HBASE_TESTING_UTILITY.cleanupTestDir();
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index 44a398bda44..a3f291b7949 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -110,8 +110,6 @@ public class TestBucketCacheRefCnt {
   // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but 
was:<2>
   public void testBlockInRAMCache() throws IOException {
     cache = create(1, 1000);
-    // Set this to true;
-    cache.wait_when_cache = true;
     disableWriter();
     final String prefix = "testBlockInRamCache";
     try {
diff --git a/hbase-server/src/test/resources/hbase-site.xml 
b/hbase-server/src/test/resources/hbase-site.xml
index 0b6f1d59a0e..23a84335d94 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -277,4 +277,9 @@
     <value>3</value>
     <description>Default is unbounded</description>
   </property>
+  <property>
+    <name>hbase.bucketcache.queue.addition.waittime</name>
+    <value>1000</value>
+    <description>Default is 0</description>
+  </property>
 </configuration>

Reply via email to