Repository: hbase
Updated Branches:
  refs/heads/branch-1.4 831fe57d0 -> 302edf447


HBASE-21504 If enable FIFOCompactionPolicy, a compaction may write a "empty" 
hfile whose maxTimeStamp is long max. This kind of hfile will never be archived.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/302edf44
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/302edf44
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/302edf44

Branch: refs/heads/branch-1.4
Commit: 302edf447a49fb5c0fe347b5d43cf6874a3e5c5a
Parents: 831fe57
Author: huzheng <open...@gmail.com>
Authored: Thu Nov 29 18:00:18 2018 +0800
Committer: huzheng <open...@gmail.com>
Committed: Fri Nov 30 10:11:56 2018 +0800

----------------------------------------------------------------------
 .../compactions/FIFOCompactionPolicy.java       | 44 +++++++++------
 .../compactions/TestFIFOCompactionPolicy.java   | 57 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/302edf44/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
index d339898..52e1b72 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java
@@ -97,17 +97,29 @@ public class FIFOCompactionPolicy extends 
ExploringCompactionPolicy {
     return hasExpiredStores(storeFiles);
   }
 
-  private  boolean hasExpiredStores(Collection<StoreFile> files) {
+  /**
+   * The FIFOCompactionPolicy only choose those TTL expired HFiles as the 
compaction candidates. So
+   * if all HFiles are TTL expired, then the compaction will generate a new 
empty HFile. While its
+   * max timestamp will be Long.MAX_VALUE. If not considered separately, the 
HFile will never be
+   * archived because its TTL will be never expired. So we'll check the empty 
store file separately.
+   * (See HBASE-21504)
+   */
+  private boolean isEmptyStoreFile(StoreFile sf) {
+    return sf.getReader().getEntries() == 0;
+  }
+
+  private boolean hasExpiredStores(Collection<StoreFile> files) {
     long currentTime = EnvironmentEdgeManager.currentTime();
-    for(StoreFile sf: files){
+    for (StoreFile sf : files) {
+      if (isEmptyStoreFile(sf)) {
+        return true;
+      }
       // Check MIN_VERSIONS is in HStore removeUnneededFiles
       Long maxTs = sf.getReader().getMaxTimestamp();
       long maxTtl = storeConfigInfo.getStoreFileTtl();
-      if(maxTs == null 
-          || maxTtl == Long.MAX_VALUE
-          || (currentTime - maxTtl < maxTs)){
-        continue; 
-      } else{
+      if (maxTs == null || maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < 
maxTs)) {
+        continue;
+      } else {
         return true;
       }
     }
@@ -115,18 +127,20 @@ public class FIFOCompactionPolicy extends 
ExploringCompactionPolicy {
   }
 
   private  Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
-    Collection<StoreFile> filesCompacting) {
+      Collection<StoreFile> filesCompacting) {
     long currentTime = EnvironmentEdgeManager.currentTime();
-    Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();    
-    for(StoreFile sf: files){
+    Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();
+    for (StoreFile sf : files) {
+      if (isEmptyStoreFile(sf)) {
+        expiredStores.add(sf);
+        continue;
+      }
       // Check MIN_VERSIONS is in HStore removeUnneededFiles
       Long maxTs = sf.getReader().getMaxTimestamp();
       long maxTtl = storeConfigInfo.getStoreFileTtl();
-      if(maxTs == null 
-          || maxTtl == Long.MAX_VALUE
-          || (currentTime - maxTtl < maxTs)){
-        continue; 
-      } else if(filesCompacting == null || filesCompacting.contains(sf) == 
false){
+      if (maxTs == null || maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < 
maxTs)) {
+        continue;
+      } else if (filesCompacting == null || filesCompacting.contains(sf) == 
false) {
         expiredStores.add(sf);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/302edf44/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java
index d92ef32..41832e9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.TimeOffsetEnvironmentEdge;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -189,4 +191,59 @@ public class TestFIFOCompactionPolicy {
     desc.addFamily(colDesc);
     TEST_UTIL.getHBaseAdmin().createTable(desc);
   }
+
+  /**
+   * Unit test for HBASE-21504
+   */
+  @Test
+  public void testFIFOCompactionPolicyExpiredEmptyHFiles() throws Exception {
+    TableName tableName = 
TableName.valueOf("testFIFOCompactionPolicyExpiredEmptyHFiles");
+    HColumnDescriptor colDesc = new HColumnDescriptor(family).setTimeToLive(1);
+    HTableDescriptor desc = new HTableDescriptor(tableName)
+        .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
+          FIFOCompactionPolicy.class.getName())
+        .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
+          DisabledRegionSplitPolicy.class.getName())
+        .addFamily(colDesc);
+    Table table = TEST_UTIL.createTable(desc, null);
+    long ts = System.currentTimeMillis() - 10 * 1000;
+    Put put =
+        new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, ts, 
Bytes.toBytes("value0"));
+    table.put(put);
+    TEST_UTIL.getHBaseAdmin().flush(tableName); // HFile-0
+    put = new Put(Bytes.toBytes("row2")).addColumn(family, qualifier, ts, 
Bytes.toBytes("value1"));
+    table.put(put);
+    TEST_UTIL.getHBaseAdmin().flush(tableName); // HFile-1
+    Store store = getStoreWithName(tableName);
+    Assert.assertNotNull(store);
+    Assert.assertEquals(2, store.getStorefilesCount());
+    TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+    for (int i = 0; i < 100; i++) {
+      if (store.getStorefilesCount() > 1) {
+        Thread.sleep(100);
+      } else {
+        break;
+      }
+    }
+    Assert.assertEquals(1, store.getStorefilesCount());
+    StoreFile sf = store.getStorefiles().iterator().next();
+    Assert.assertNotNull(sf);
+    Assert.assertEquals(0, sf.getReader().getEntries());
+    put = new Put(Bytes.toBytes("row3")).addColumn(family, qualifier, ts, 
Bytes.toBytes("value1"));
+    table.put(put);
+    TEST_UTIL.getHBaseAdmin().flush(tableName); // HFile-2
+    Assert.assertEquals(2, store.getStorefilesCount());
+    TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+    for (int i = 0; i < 100; i++) {
+      if (store.getStorefilesCount() > 1) {
+        Thread.sleep(100);
+      } else {
+        break;
+      }
+    }
+    Assert.assertEquals(1, store.getStorefilesCount());
+    sf = store.getStorefiles().iterator().next();
+    Assert.assertNotNull(sf);
+    Assert.assertEquals(0, sf.getReader().getEntries());
+  }
 }

Reply via email to