wchevreuil commented on code in PR #5080:
URL: https://github.com/apache/hbase/pull/5080#discussion_r1126210312


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java:
##########
@@ -469,6 +472,9 @@ public void cacheBlockWithWait(BlockCacheKey cacheKey, 
Cacheable cachedItem, boo
       } else {
         cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
       }
+      if (ioEngine.isPersistent()){
+        setCacheDirty(true);
+      }

Review Comment:
   At this point we are not guaranteed to have added the block, as it's just 
sent to the writer threads. We should be doing this 
[here](https://github.com/Kota-SH/hbase/blob/hbase-27686/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java#L1036)
 instead.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java:
##########
@@ -0,0 +1,31 @@
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
[email protected]
+public class BucketCachePersister extends Thread {
+  private final BucketCache cache;
+  private final long intervalMillis;
+  private static final Logger LOG = 
LoggerFactory.getLogger(BucketCachePersister.class);
+
+  public BucketCachePersister(BucketCache cache, long intervalMillis) {
+    this.cache = cache;
+    this.intervalMillis = intervalMillis;
+  }
+
+  public void run() {
+    while(true) {
+      try {
+        Thread.sleep(intervalMillis);
+        if(cache.isDirty()){
+          cache.persistToFile();

Review Comment:
   Should set cacheDirty to false.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java:
##########
@@ -314,6 +315,8 @@ public BucketCache(String ioEngineName, long capacity, int 
blockSize, int[] buck
     this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
 
     if (ioEngine.isPersistent() && persistencePath != null) {
+      BucketCachePersister cachePersister = new BucketCachePersister(this, 
1000);

Review Comment:
   Make this interval configurable. Maybe worth decreasing this default.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java:
##########
@@ -178,6 +178,7 @@ public class BucketCache implements BlockCache, HeapSize {
   private final BucketCacheStats cacheStats = new BucketCacheStats();
 
   private final String persistencePath;
+  boolean isCacheDirty;

Review Comment:
   Use AtomicBoolean?



##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java:
##########
@@ -0,0 +1,115 @@
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.TestPrefetchRSClose;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({ IOTests.class, LargeTests.class })
+public class TestBucketCachePersister {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBucketCachePersister.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestBucketCachePersister.class);
+
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+  private Configuration conf;
+  Path testDir;
+  MiniZooKeeperCluster zkCluster;
+  SingleProcessHBaseCluster cluster;
+  StartTestingClusterOption option =
+    StartTestingClusterOption.builder().numRegionServers(2).build();
+
+  @Before
+  public void setup() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
+    conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
+    conf.setInt("hbase.bucketcache.size", 400);
+    conf.set("hbase.bucketcache.persistent.path", testDir + 
"/bucket.persistence");
+    conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + 
"/prefetch.persistence");
+    zkCluster = TEST_UTIL.startMiniZKCluster();
+    cluster = TEST_UTIL.startMiniHBaseCluster(option);
+    assertEquals(2, cluster.getRegionServerThreads().size());
+    cluster.setConf(conf);
+  }
+
+  @Test
+  public void testPersistenceRSCrash() throws Exception {
+    // Write to table and flush
+    TableName tableName = TableName.valueOf("table1");
+    byte[] row0 = Bytes.toBytes("row1");
+    byte[] row1 = Bytes.toBytes("row2");
+    byte[] family = Bytes.toBytes("family");
+    byte[] qf1 = Bytes.toBytes("qf1");
+    byte[] qf2 = Bytes.toBytes("qf2");
+    byte[] value1 = Bytes.toBytes("value1");
+    byte[] value2 = Bytes.toBytes("value2");
+
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
+    Table table = TEST_UTIL.createTable(td, null);
+    try {
+      // put data
+      Put put0 = new Put(row0);
+      put0.addColumn(family, qf1, 1, value1);
+      table.put(put0);
+      Put put1 = new Put(row1);
+      put1.addColumn(family, qf2, 1, value2);
+      table.put(put1);
+      TEST_UTIL.flush(tableName);
+      Get get = new Get(row0);
+      get.addColumn(family, qf1);
+      cluster.getRegions(tableName).get(0).get(get);
+    } finally {
+      Thread.sleep(1000);
+    }
+    // Kill the RS
+    cluster.killRegionServer(cluster.getRegionServer(0).getServerName());

Review Comment:
   I had noticed that even when using this killRegionServer, we still go 
through the normal shutdown path for block cache, so we can't really rely on 
this. But since the main logic we want to test is that updates happen while the 
RS is running, we could check for the file existence without killing the RS. 
Also, can we validate the contents of the persisted file? At this point, I 
think we should certify that all the cache entries are there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to