This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 0a0a4743638 HBASE-26681 Introduce a little RAMBuffer for bucketcache 
to reduce gc and improve throughput (#4043)
0a0a4743638 is described below

commit 0a0a474363893e4ce2e53923d1a3dbcdf3ffd331
Author: Yutong Xiao <[email protected]>
AuthorDate: Tue Aug 2 16:28:10 2022 +0800

    HBASE-26681 Introduce a little RAMBuffer for bucketcache to reduce gc and 
improve throughput (#4043)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |  14 ++-
 .../hbase/io/hfile/bucket/BufferedBucketCache.java | 131 +++++++++++++++++++++
 2 files changed, 142 insertions(+), 3 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 6efa9dbd2e9..61ffe32e493 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BufferedBucketCache;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -151,6 +152,11 @@ public class CacheConfig {
   private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
   public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = 
Long.MAX_VALUE;
 
+  /**
+   * Parameter to turn on bucketcache ramBuffer.
+   */
+  static final String RAM_BUFFER_ENABLE = "hbase.bucketcache.rambuffer.enable";
+  static final boolean RAM_BUFFER_ENABLE_DEFAULT = false;
 
   /**
    * @deprecated use {@link CacheConfig#BLOCKCACHE_BLOCKSIZE_KEY} instead.
@@ -738,9 +744,11 @@ public class CacheConfig {
         "hbase.bucketcache.ioengine.errors.tolerated.duration",
         BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
       // Bucket cache logs its stats on creation internal to the constructor.
-      bucketCache = new BucketCache(bucketCacheIOEngineName,
-        bucketCacheSize, blockSize, bucketSizes, writerThreads, 
writerQueueLen, persistentPath,
-        ioErrorsTolerationDuration, c);
+      bucketCache = c.getBoolean(RAM_BUFFER_ENABLE, RAM_BUFFER_ENABLE_DEFAULT) 
?
+        new BufferedBucketCache(bucketCacheIOEngineName, bucketCacheSize, 
blockSize, bucketSizes,
+          writerThreads, writerQueueLen, persistentPath, 
ioErrorsTolerationDuration, c) :
+        new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize, 
bucketSizes,
+          writerThreads, writerQueueLen, persistentPath, 
ioErrorsTolerationDuration, c);
     } catch (IOException ioex) {
       LOG.error("Can't instantiate bucket cache", ioex); throw new 
RuntimeException(ioex);
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java
new file mode 100644
index 00000000000..bcc98503811
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+
+/**
+ * A {@link BucketCache} with RAMBuffer to reduce GC pressure.
+ */
[email protected]
+public class BufferedBucketCache extends BucketCache {
+  private static final Log LOG = LogFactory.getLog(BufferedBucketCache.class);
+
+  static final String RAM_BUFFER_SIZE_RATIO = 
"hbase.bucketcache.rambuffer.ratio";
+  static final double RAM_BUFFER_SIZE_RATIO_DEFAULT = 0.1;
+  static final String RAM_BUFFER_TIMEOUT = 
"hbase.bucketcache.rambuffer.timeout"; // in seconds.
+  static final int RAM_BUFFER_TIMEOUT_DEFAULT = 60;
+
+  private final Cache<BlockCacheKey, Cacheable> ramBuffer;
+  private final long maxBufferSize;
+
+  private final AtomicLong ramBufferEvictCount = new AtomicLong(0);
+
+  private volatile float ramBufferThreshold;
+
+  private transient final ScheduledExecutorService scheduleThreadPool =
+    Executors.newScheduledThreadPool(1,
+      new 
ThreadFactoryBuilder().setNameFormat("RAMBufferAdjustExecutor").setDaemon(true).build());
+
+  public BufferedBucketCache(String ioEngineName, long capacity, int 
blockSize, int[] bucketSizes,
+      int writerThreadNum, int writerQLen, String persistencePath, int 
ioErrorsTolerationDuration,
+      Configuration conf) throws IOException {
+    super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, 
writerQLen,
+      persistencePath, ioErrorsTolerationDuration, conf);
+
+    maxBufferSize = (long) ((capacity / (double) blockSize) * 
conf.getDouble(RAM_BUFFER_SIZE_RATIO,
+      RAM_BUFFER_SIZE_RATIO_DEFAULT));
+    int timeout = conf.getInt(RAM_BUFFER_TIMEOUT, RAM_BUFFER_TIMEOUT_DEFAULT);
+    ramBuffer = CacheBuilder.newBuilder().
+      expireAfterAccess(timeout, TimeUnit.SECONDS).
+      maximumSize(maxBufferSize).
+      removalListener(new RemovalListener<BlockCacheKey, Cacheable>() {
+        @Override
+        public void onRemoval(RemovalNotification<BlockCacheKey, Cacheable> 
removalNotification) {
+          ramBufferEvictCount.incrementAndGet();
+        }
+      }).build();
+
+    // Adjust the cache threshold every minute.
+    scheduleThreadPool.scheduleAtFixedRate(
+      new RAMBufferAdjustThread(this), 60, 60,TimeUnit.SECONDS);
+  }
+
+  @Override
+  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
+    boolean updateCacheMetrics) {
+    Cacheable block = ramBuffer.getIfPresent(key);
+    if (block != null) {
+      if (updateCacheMetrics) {
+        this.getStats().hit(caching, key.isPrimary(), key.getBlockType());
+      }
+      return block;
+    }
+    block = super.getBlock(key, caching, repeat, updateCacheMetrics);
+    if (block != null && ramBuffer.size() < maxBufferSize * 
ramBufferThreshold) {
+      ramBuffer.put(key, block);
+    }
+    return block;
+  }
+
+  private void updateRAMBufferThreshold(final float newThreshold) {
+    this.ramBufferThreshold = Math.max(Math.min(newThreshold, 1.0f), 0.01f);
+  }
+
+  static class RAMBufferAdjustThread extends Thread {
+    private final BufferedBucketCache bucketCache;
+
+    RAMBufferAdjustThread(BufferedBucketCache bucketCache) {
+      this.bucketCache = bucketCache;
+    }
+
+    @Override
+    public void run() {
+      long currentEvictCount = bucketCache.ramBufferEvictCount.get();
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+        LOG.info(e);
+        return;
+      }
+      long delta = (bucketCache.ramBufferEvictCount.get() - currentEvictCount) 
/ 10;
+      if (delta > 100) {
+        bucketCache.updateRAMBufferThreshold((float) 
(bucketCache.ramBufferThreshold * 0.9));
+      } else if (delta < 10) {
+        bucketCache.updateRAMBufferThreshold((float) 
(bucketCache.ramBufferThreshold * 1.1));
+      }
+    }
+  }
+}

Reply via email to