This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 0a0a4743638 HBASE-26681 Introduce a little RAMBuffer for bucketcache
to reduce gc and improve throughput (#4043)
0a0a4743638 is described below
commit 0a0a474363893e4ce2e53923d1a3dbcdf3ffd331
Author: Yutong Xiao <[email protected]>
AuthorDate: Tue Aug 2 16:28:10 2022 +0800
HBASE-26681 Introduce a little RAMBuffer for bucketcache to reduce gc and
improve throughput (#4043)
Signed-off-by: Duo Zhang <[email protected]>
---
.../apache/hadoop/hbase/io/hfile/CacheConfig.java | 14 ++-
.../hbase/io/hfile/bucket/BufferedBucketCache.java | 131 +++++++++++++++++++++
2 files changed, 142 insertions(+), 3 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 6efa9dbd2e9..61ffe32e493 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BufferedBucketCache;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -151,6 +152,11 @@ public class CacheConfig {
private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD =
Long.MAX_VALUE;
+ /**
+ * Parameter to turn on bucketcache ramBuffer.
+ */
+ static final String RAM_BUFFER_ENABLE = "hbase.bucketcache.rambuffer.enable";
+ static final boolean RAM_BUFFER_ENABLE_DEFAULT = false;
/**
* @deprecated use {@link CacheConfig#BLOCKCACHE_BLOCKSIZE_KEY} instead.
@@ -738,9 +744,11 @@ public class CacheConfig {
"hbase.bucketcache.ioengine.errors.tolerated.duration",
BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
// Bucket cache logs its stats on creation internal to the constructor.
- bucketCache = new BucketCache(bucketCacheIOEngineName,
- bucketCacheSize, blockSize, bucketSizes, writerThreads,
writerQueueLen, persistentPath,
- ioErrorsTolerationDuration, c);
+ bucketCache = c.getBoolean(RAM_BUFFER_ENABLE, RAM_BUFFER_ENABLE_DEFAULT)
?
+ new BufferedBucketCache(bucketCacheIOEngineName, bucketCacheSize,
blockSize, bucketSizes,
+ writerThreads, writerQueueLen, persistentPath,
ioErrorsTolerationDuration, c) :
+ new BucketCache(bucketCacheIOEngineName, bucketCacheSize, blockSize,
bucketSizes,
+ writerThreads, writerQueueLen, persistentPath,
ioErrorsTolerationDuration, c);
} catch (IOException ioex) {
LOG.error("Can't instantiate bucket cache", ioex); throw new
RuntimeException(ioex);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java
new file mode 100644
index 00000000000..bcc98503811
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BufferedBucketCache.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+
+/**
+ * A {@link BucketCache} with RAMBuffer to reduce GC pressure.
+ */
[email protected]
+public class BufferedBucketCache extends BucketCache {
+ private static final Log LOG = LogFactory.getLog(BufferedBucketCache.class);
+
+ static final String RAM_BUFFER_SIZE_RATIO =
"hbase.bucketcache.rambuffer.ratio";
+ static final double RAM_BUFFER_SIZE_RATIO_DEFAULT = 0.1;
+ static final String RAM_BUFFER_TIMEOUT =
"hbase.bucketcache.rambuffer.timeout"; // in seconds.
+ static final int RAM_BUFFER_TIMEOUT_DEFAULT = 60;
+
+ private final Cache<BlockCacheKey, Cacheable> ramBuffer;
+ private final long maxBufferSize;
+
+ private final AtomicLong ramBufferEvictCount = new AtomicLong(0);
+
+ private volatile float ramBufferThreshold;
+
+ private transient final ScheduledExecutorService scheduleThreadPool =
+ Executors.newScheduledThreadPool(1,
+ new
ThreadFactoryBuilder().setNameFormat("RAMBufferAdjustExecutor").setDaemon(true).build());
+
+ public BufferedBucketCache(String ioEngineName, long capacity, int
blockSize, int[] bucketSizes,
+ int writerThreadNum, int writerQLen, String persistencePath, int
ioErrorsTolerationDuration,
+ Configuration conf) throws IOException {
+ super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum,
writerQLen,
+ persistencePath, ioErrorsTolerationDuration, conf);
+
+ maxBufferSize = (long) ((capacity / (double) blockSize) *
conf.getDouble(RAM_BUFFER_SIZE_RATIO,
+ RAM_BUFFER_SIZE_RATIO_DEFAULT));
+ int timeout = conf.getInt(RAM_BUFFER_TIMEOUT, RAM_BUFFER_TIMEOUT_DEFAULT);
+ ramBuffer = CacheBuilder.newBuilder().
+ expireAfterAccess(timeout, TimeUnit.SECONDS).
+ maximumSize(maxBufferSize).
+ removalListener(new RemovalListener<BlockCacheKey, Cacheable>() {
+ @Override
+ public void onRemoval(RemovalNotification<BlockCacheKey, Cacheable>
removalNotification) {
+ ramBufferEvictCount.incrementAndGet();
+ }
+ }).build();
+
+ // Adjust the cache threshold every minute.
+ scheduleThreadPool.scheduleAtFixedRate(
+ new RAMBufferAdjustThread(this), 60, 60,TimeUnit.SECONDS);
+ }
+
+ @Override
+ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
+ boolean updateCacheMetrics) {
+ Cacheable block = ramBuffer.getIfPresent(key);
+ if (block != null) {
+ if (updateCacheMetrics) {
+ this.getStats().hit(caching, key.isPrimary(), key.getBlockType());
+ }
+ return block;
+ }
+ block = super.getBlock(key, caching, repeat, updateCacheMetrics);
+ if (block != null && ramBuffer.size() < maxBufferSize *
ramBufferThreshold) {
+ ramBuffer.put(key, block);
+ }
+ return block;
+ }
+
+ private void updateRAMBufferThreshold(final float newThreshold) {
+ this.ramBufferThreshold = Math.max(Math.min(newThreshold, 1.0f), 0.01f);
+ }
+
+ static class RAMBufferAdjustThread extends Thread {
+ private final BufferedBucketCache bucketCache;
+
+ RAMBufferAdjustThread(BufferedBucketCache bucketCache) {
+ this.bucketCache = bucketCache;
+ }
+
+ @Override
+ public void run() {
+ long currentEvictCount = bucketCache.ramBufferEvictCount.get();
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOG.info(e);
+ return;
+ }
+ long delta = (bucketCache.ramBufferEvictCount.get() - currentEvictCount)
/ 10;
+ if (delta > 100) {
+ bucketCache.updateRAMBufferThreshold((float)
(bucketCache.ramBufferThreshold * 0.9));
+ } else if (delta < 10) {
+ bucketCache.updateRAMBufferThreshold((float)
(bucketCache.ramBufferThreshold * 1.1));
+ }
+ }
+ }
+}