Repository: hbase
Updated Branches:
  refs/heads/master 78da0e366 -> b336da925


HBASE-20727 Persist FlushedSequenceId to speed up WAL split after cluster 
restart


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b336da92
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b336da92
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b336da92

Branch: refs/heads/master
Commit: b336da925ac5c5ee3565112de4b808fe2eed08a2
Parents: 78da0e3
Author: Allan Yang <allan...@apache.org>
Authored: Tue Jun 19 09:45:47 2018 +0800
Committer: Allan Yang <allan...@163.com>
Committed: Tue Jun 19 09:45:47 2018 +0800

----------------------------------------------------------------------
 .../src/main/protobuf/HBase.proto               |  15 ++
 .../org/apache/hadoop/hbase/master/HMaster.java |   8 +
 .../hadoop/hbase/master/ServerManager.java      | 213 ++++++++++++++++++-
 .../apache/hadoop/hbase/master/TestMaster.java  |  33 +++
 4 files changed, 268 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-protocol-shaded/src/main/protobuf/HBase.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto 
b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
index 29067f1..0af2ffd 100644
--- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto
@@ -252,4 +252,19 @@ message CacheEvictionStats {
   optional int64 bytes_evicted = 2;
   optional int64 max_cache_size = 3;
   repeated RegionExceptionMessage exception = 4;
+}
+
+message FlushedStoreSequenceId {
+  required bytes family = 1;
+  required uint64 seqId = 2;
+}
+
+message FlushedRegionSequenceId {
+  required bytes regionEncodedName = 1;
+  required uint64 seqId = 2;
+  repeated FlushedStoreSequenceId stores = 3;
+}
+
+message FlushedSequenceId {
+  repeated FlushedRegionSequenceId regionSequenceId = 1;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 883bb4f..38aac50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -862,6 +862,13 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
     status.setStatus("Initializing ZK system trackers");
     initializeZKBasedSystemTrackers();
+    status.setStatus("Loading last flushed sequence id of regions");
+    try {
+      this.serverManager.loadLastFlushedSequenceIds();
+    } catch (IOException e) {
+      LOG.debug("Failed to load last flushed sequence id of regions"
+          + " from file system", e);
+    }
     // Set ourselves as active Master now our claim has succeeded up in zk.
     this.activeMaster = true;
 
@@ -946,6 +953,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     getChoreService().scheduleChore(normalizerChore);
     this.catalogJanitorChore = new CatalogJanitor(this);
     getChoreService().scheduleChore(catalogJanitorChore);
+    this.serverManager.startChore();
 
     status.setStatus("Starting cluster schema service");
     initClusterSchemaService();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index c746502..cfbd52f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -38,10 +38,15 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ClockOutOfSyncException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerMetricsBuilder;
 import org.apache.hadoop.hbase.ServerName;
@@ -51,9 +56,11 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -62,12 +69,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedRegionSequenceId;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedSequenceId;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.FlushedStoreSequenceId;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 
 /**
@@ -106,6 +117,22 @@ public class ServerManager {
   public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
       "hbase.master.wait.on.regionservers.interval";
 
+  /**
+   * see HBASE-20727
+   * if set to true, flushedSequenceIdByRegion and 
storeFlushedSequenceIdsByRegion
+   * will be persisted to HDFS and loaded when master restart to speed up log 
split
+   */
+  public static final String PERSIST_FLUSHEDSEQUENCEID =
+      "hbase.master.persist.flushedsequenceid.enabled";
+
+  public static final boolean PERSIST_FLUSHEDSEQUENCEID_DEFAULT = true;
+
+  public static final String FLUSHEDSEQUENCEID_FLUSHER_INTERVAL =
+      "hbase.master.flushedsequenceid.flusher.interval";
+
+  public static final int FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT =
+      3 * 60 * 60 * 1000; // 3 hours
+
   private static final Logger LOG = 
LoggerFactory.getLogger(ServerManager.class);
 
   // Set if we are to shutdown the cluster.
@@ -117,6 +144,13 @@ public class ServerManager {
   private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion 
=
     new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
 
+  private boolean persistFlushedSequenceId = true;
+  private volatile boolean isFlushSeqIdPersistInProgress = false;
+  /** File on hdfs to store last flushed sequence id of regions */
+  private static final String LAST_FLUSHED_SEQ_ID_FILE = ".lastflushedseqids";
+  private  FlushedSequenceIdFlusher flushedSeqIdFlusher;
+
+
   /**
    * The last flushed sequence id for a store in a region.
    */
@@ -194,6 +228,8 @@ public class ServerManager {
     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
     this.connection = master.getClusterConnection();
     this.rpcControllerFactory = this.connection == null? null: 
connection.getRpcControllerFactory();
+    persistFlushedSequenceId = c.getBoolean(PERSIST_FLUSHEDSEQUENCEID,
+        PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
   }
 
   /**
@@ -424,6 +460,11 @@ public class ServerManager {
     this.rsAdmins.remove(serverName);
   }
 
+  @VisibleForTesting
+  public ConcurrentNavigableMap<byte[], Long> getFlushedSequenceIdByRegion() {
+    return flushedSequenceIdByRegion;
+  }
+
   public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] 
encodedRegionName) {
     RegionStoreSequenceIds.Builder builder = 
RegionStoreSequenceIds.newBuilder();
     Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
@@ -601,6 +642,10 @@ public class ServerManager {
         listener.serverRemoved(serverName);
       }
     }
+    // trigger a persist of flushedSeqId
+    if (flushedSeqIdFlusher != null) {
+      flushedSeqIdFlusher.triggerNow();
+    }
     return true;
   }
 
@@ -968,10 +1013,36 @@ public class ServerManager {
   }
 
   /**
+   * start chore in ServerManager
+   */
+  public void startChore() {
+    Configuration c = master.getConfiguration();
+    if (persistFlushedSequenceId) {
+      // when reach here, RegionStates should loaded, firstly, we call remove 
deleted regions
+      removeDeletedRegionFromLoadedFlushedSequenceIds();
+      int flushPeriod = c.getInt(FLUSHEDSEQUENCEID_FLUSHER_INTERVAL,
+          FLUSHEDSEQUENCEID_FLUSHER_INTERVAL_DEFAULT);
+      flushedSeqIdFlusher = new FlushedSequenceIdFlusher(
+          "FlushedSequenceIdFlusher", flushPeriod);
+      master.getChoreService().scheduleChore(flushedSeqIdFlusher);
+    }
+  }
+
+  /**
    * Stop the ServerManager.
    */
   public void stop() {
-    // Nothing to do.
+    if (flushedSeqIdFlusher != null) {
+      flushedSeqIdFlusher.cancel();
+    }
+    if (persistFlushedSequenceId) {
+      try {
+        persistRegionLastFlushedSequenceIds();
+      } catch (IOException e) {
+        LOG.warn("Failed to persist last flushed sequence id of regions"
+            + " to file system", e);
+      }
+    }
   }
 
   /**
@@ -1065,4 +1136,144 @@ public class ServerManager {
     ServerMetrics serverMetrics = onlineServers.get(serverName);
     return serverMetrics != null ? serverMetrics.getInfoServerPort() : 0;
   }
+
+  /**
+   * Persist last flushed sequence id of each region to HDFS
+   * @throws IOException if persit to HDFS fails
+   */
+  private void persistRegionLastFlushedSequenceIds() throws IOException {
+    if (isFlushSeqIdPersistInProgress) {
+      return;
+    }
+    isFlushSeqIdPersistInProgress = true;
+    try {
+      Configuration conf = master.getConfiguration();
+      Path rootDir = FSUtils.getRootDir(conf);
+      Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(lastFlushedSeqIdPath)) {
+        LOG.info("Rewriting .lastflushedseqids file at: "
+            + lastFlushedSeqIdPath);
+        if (!fs.delete(lastFlushedSeqIdPath, false)) {
+          throw new IOException("Unable to remove existing "
+              + lastFlushedSeqIdPath);
+        }
+      } else {
+        LOG.info("Writing .lastflushedseqids file at: " + 
lastFlushedSeqIdPath);
+      }
+      FSDataOutputStream out = fs.create(lastFlushedSeqIdPath);
+      FlushedSequenceId.Builder flushedSequenceIdBuilder =
+          FlushedSequenceId.newBuilder();
+      try {
+        for (Entry<byte[], Long> entry : flushedSequenceIdByRegion.entrySet()) 
{
+          FlushedRegionSequenceId.Builder flushedRegionSequenceIdBuilder =
+              FlushedRegionSequenceId.newBuilder();
+          flushedRegionSequenceIdBuilder.setRegionEncodedName(
+              ByteString.copyFrom(entry.getKey()));
+          flushedRegionSequenceIdBuilder.setSeqId(entry.getValue());
+          ConcurrentNavigableMap<byte[], Long> storeSeqIds =
+              storeFlushedSequenceIdsByRegion.get(entry.getKey());
+          if (storeSeqIds != null) {
+            for (Entry<byte[], Long> store : storeSeqIds.entrySet()) {
+              FlushedStoreSequenceId.Builder flushedStoreSequenceIdBuilder =
+                  FlushedStoreSequenceId.newBuilder();
+              
flushedStoreSequenceIdBuilder.setFamily(ByteString.copyFrom(store.getKey()));
+              flushedStoreSequenceIdBuilder.setSeqId(store.getValue());
+              
flushedRegionSequenceIdBuilder.addStores(flushedStoreSequenceIdBuilder);
+            }
+          }
+          
flushedSequenceIdBuilder.addRegionSequenceId(flushedRegionSequenceIdBuilder);
+        }
+        flushedSequenceIdBuilder.build().writeDelimitedTo(out);
+      } finally {
+        if (out != null) {
+          out.close();
+        }
+      }
+    } finally {
+      isFlushSeqIdPersistInProgress = false;
+    }
+  }
+
+  /**
+   * Load last flushed sequence id of each region from HDFS, if persisted
+   */
+  public void loadLastFlushedSequenceIds() throws IOException {
+    if (!persistFlushedSequenceId) {
+      return;
+    }
+    Configuration conf = master.getConfiguration();
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path lastFlushedSeqIdPath = new Path(rootDir, LAST_FLUSHED_SEQ_ID_FILE);
+    FileSystem fs = FileSystem.get(conf);
+    if (!fs.exists(lastFlushedSeqIdPath)) {
+      LOG.info("No .lastflushedseqids found at" + lastFlushedSeqIdPath
+          + " will record last flushed sequence id"
+          + " for regions by regionserver report all over again");
+      return;
+    } else {
+      LOG.info("begin to load .lastflushedseqids at " + lastFlushedSeqIdPath);
+    }
+    FSDataInputStream in = fs.open(lastFlushedSeqIdPath);
+    try {
+      FlushedSequenceId flushedSequenceId =
+          FlushedSequenceId.parseDelimitedFrom(in);
+      for (FlushedRegionSequenceId flushedRegionSequenceId : flushedSequenceId
+          .getRegionSequenceIdList()) {
+        byte[] encodedRegionName = flushedRegionSequenceId
+            .getRegionEncodedName().toByteArray();
+        flushedSequenceIdByRegion
+            .putIfAbsent(encodedRegionName, 
flushedRegionSequenceId.getSeqId());
+        if (flushedRegionSequenceId.getStoresList() != null
+            && flushedRegionSequenceId.getStoresList().size() != 0) {
+          ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
+              computeIfAbsent(storeFlushedSequenceIdsByRegion, 
encodedRegionName,
+                () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
+          for (FlushedStoreSequenceId flushedStoreSequenceId : 
flushedRegionSequenceId
+              .getStoresList()) {
+            storeFlushedSequenceId
+                .put(flushedStoreSequenceId.getFamily().toByteArray(),
+                    flushedStoreSequenceId.getSeqId());
+          }
+        }
+      }
+    } finally {
+      in.close();
+    }
+  }
+
+  /**
+   * Regions may have been removed between latest persist of FlushedSequenceIds
+   * and master abort. So after loading FlushedSequenceIds from file, and after
+   * meta loaded, we need to remove the deleted region according to 
RegionStates.
+   */
+  public void removeDeletedRegionFromLoadedFlushedSequenceIds() {
+    RegionStates regionStates = 
master.getAssignmentManager().getRegionStates();
+    Iterator<byte[]> it = flushedSequenceIdByRegion.keySet().iterator();
+    while(it.hasNext()) {
+      byte[] regionEncodedName = it.next();
+      if (regionStates.getRegionState(Bytes.toStringBinary(regionEncodedName)) 
== null) {
+        it.remove();
+        storeFlushedSequenceIdsByRegion.remove(regionEncodedName);
+      }
+    }
+  }
+
+
+  private class FlushedSequenceIdFlusher extends ScheduledChore {
+
+    public FlushedSequenceIdFlusher(String name, int p) {
+      super(name, master, p, 60 * 1000); //delay one minute before first 
execute
+    }
+
+    @Override
+    protected void chore() {
+      try {
+        persistRegionLastFlushedSequenceIds();
+      } catch (IOException e) {
+        LOG.debug("Failed to persist last flushed sequence id of regions"
+            + " to file system", e);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b336da92/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
index 11df313..8abaf60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -192,5 +195,35 @@ public class TestMaster {
       TEST_UTIL.deleteTable(tableName);
     }
   }
+
+  @Test
+  public void testFlushedSequenceIdPersistLoad() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    int msgInterval = conf.getInt("hbase.regionserver.msginterval", 100);
+    // insert some data into META
+    TableName tableName = TableName.valueOf("testFlushSeqId");
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(Bytes.toBytes("cf")));
+    Table table = TEST_UTIL.createTable(desc, null);
+    // flush META region
+    TEST_UTIL.flush(TableName.META_TABLE_NAME);
+    // wait for regionserver report
+    Threads.sleep(msgInterval * 2);
+    // record flush seqid before cluster shutdown
+    Map<byte[], Long> regionMapBefore =
+        TEST_UTIL.getHBaseCluster().getMaster().getServerManager()
+            .getFlushedSequenceIdByRegion();
+    // restart hbase cluster which will cause flushed sequence id persist and 
reload
+    TEST_UTIL.getMiniHBaseCluster().shutdown();
+    TEST_UTIL.restartHBaseCluster(2);
+    TEST_UTIL.waitUntilNoRegionsInTransition();
+    // check equality after reloading flushed sequence id map
+    Map<byte[], Long> regionMapAfter =
+        TEST_UTIL.getHBaseCluster().getMaster().getServerManager()
+            .getFlushedSequenceIdByRegion();
+    assertTrue(regionMapBefore.equals(regionMapAfter));
+
+
+  }
 }
 

Reply via email to