Repository: hbase
Updated Branches:
  refs/heads/branch-1 69d3e332f -> 39e8e2fb5


HBASE-18005 read replica: handle the case that region server hosting both 
primary replica and meta region is down (huaxiang sun)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39e8e2fb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39e8e2fb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39e8e2fb

Branch: refs/heads/branch-1
Commit: 39e8e2fb5898847ef41068cdf46a660e66389541
Parents: 69d3e33
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Jun 6 09:07:17 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Jun 6 09:07:17 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ConnectionManager.java  |   3 +-
 .../apache/hadoop/hbase/client/MetaCache.java   |  34 ++++
 .../RpcRetryingCallerWithReadReplicas.java      |  82 ++++++---
 .../client/ScannerCallableWithReplicas.java     |  24 ++-
 .../hbase/client/TestReplicaWithCluster.java    | 167 +++++++++++++++++--
 5 files changed, 269 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index aa44070..5e670a3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -1296,7 +1296,8 @@ class ConnectionManager {
         } else {
           // If we are not supposed to be using the cache, delete any existing 
cached location
           // so it won't interfere.
-          metaCache.clearCache(tableName, row);
+          // We are only supposed to clean the cache for the specific replicaId
+          metaCache.clearCache(tableName, row, replicaId);
         }
 
         // Query the meta region

http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index 95b5950..2a172e7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -319,6 +319,40 @@ public class MetaCache {
   }
 
   /**
+   * Delete a cached location with specific replicaId.
+   * @param tableName tableName
+   * @param row row key
+   * @param replicaId region replica id
+   */
+  public void clearCache(final TableName tableName, final byte [] row, int 
replicaId) {
+    ConcurrentMap<byte[], RegionLocations> tableLocations = 
getTableLocations(tableName);
+
+    RegionLocations regionLocations = getCachedLocation(tableName, row);
+    if (regionLocations != null) {
+      HRegionLocation toBeRemoved = 
regionLocations.getRegionLocation(replicaId);
+      if (toBeRemoved != null) {
+        RegionLocations updatedLocations = regionLocations.remove(replicaId);
+        byte[] startKey = 
regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+        boolean removed;
+        if (updatedLocations.isEmpty()) {
+          removed = tableLocations.remove(startKey, regionLocations);
+        } else {
+          removed = tableLocations.replace(startKey, regionLocations, 
updatedLocations);
+        }
+
+        if (removed) {
+          if (metrics != null) {
+            metrics.incrMetaCacheNumClearRegion();
+          }
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Removed " + toBeRemoved + " from cache");
+          }
+        }
+      }
+    }
+  }
+
+  /**
    * Delete a cached location for a table, row and server
    */
   public void clearCache(final TableName tableName, final byte [] row, 
ServerName serverName) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 8c5efde..e6954cc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -195,10 +195,39 @@ public class RpcRetryingCallerWithReadReplicas {
       throws DoNotRetryIOException, InterruptedIOException, 
RetriesExhaustedException {
     boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0);
 
-    RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? 
get.getReplicaId()
-        : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, 
get.getRow());
-    ResultBoundedCompletionService<Result> cs =
-        new 
ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, 
rl.size());
+    RegionLocations rl = null;
+    boolean skipPrimary = false;
+    try {
+      rl = getRegionLocations(true,
+        (isTargetReplicaSpecified ? get.getReplicaId() : 
RegionReplicaUtil.DEFAULT_REPLICA_ID),
+        cConnection, tableName, get.getRow());
+    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
+      // When there is no specific replica id specified. It just needs to load 
all replicas.
+      if (isTargetReplicaSpecified) {
+        throw e;
+      } else {
+        // We cannot get the primary replica location, it is possible that the 
region
+        // server hosting meta is down, it needs to proceed to try cached 
replicas.
+        if (cConnection instanceof 
ConnectionManager.HConnectionImplementation) {
+          rl = 
((ConnectionManager.HConnectionImplementation)cConnection).getCachedLocation(
+              tableName, get.getRow());
+          if (rl == null) {
+            // No cached locations
+            throw e;
+          }
+
+          // Primary replica location is not known, skip primary replica
+          skipPrimary = true;
+        } else {
+          // For completeness
+          throw e;
+        }
+      }
+    }
+
+    final ResultBoundedCompletionService<Result> cs =
+        new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, 
pool, rl.size());
+
     int startIndex = 0;
     int endIndex = rl.size();
 
@@ -206,25 +235,30 @@ public class RpcRetryingCallerWithReadReplicas {
       addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
       endIndex = 1;
     } else {
-      addCallsForReplica(cs, rl, 0, 0);
-      try {
-        // wait for the timeout to see whether the primary responds back
-        Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); 
// Yes, microseconds
-        if (f != null) {
-          return f.get(); //great we got a response
-        }
-      } catch (ExecutionException e) {
-        // We ignore the ExecutionException and continue with the secondary 
replicas
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Primary replica returns " + e.getCause());
+      if (!skipPrimary) {
+        addCallsForReplica(cs, rl, 0, 0);
+        try {
+          // wait for the timeout to see whether the primary responds back
+          Future<Result> f = cs.poll(timeBeforeReplicas, 
TimeUnit.MICROSECONDS); // Yes, microseconds
+          if (f != null) {
+            return f.get(); //great we got a response
+          }
+        } catch (ExecutionException e) {
+          // We ignore the ExecutionException and continue with the secondary 
replicas
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Primary replica returns " + e.getCause());
+          }
+
+          // Skip the result from the primary as we know that there is 
something wrong
+          startIndex = 1;
+        } catch (CancellationException e) {
+          throw new InterruptedIOException();
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException();
         }
-
-        // Skip the result from the primary as we know that there is something 
wrong
-        startIndex = 1;
-      } catch (CancellationException e) {
-        throw new InterruptedIOException();
-      } catch (InterruptedException e) {
-        throw new InterruptedIOException();
+      } else {
+        // Since primary replica is skipped, the endIndex needs to be adjusted 
accordingly
+        endIndex --;
       }
 
       // submit call for the all of the secondaries at once
@@ -324,10 +358,10 @@ public class RpcRetryingCallerWithReadReplicas {
     } catch (InterruptedIOException e) {
       throw e;
     } catch (IOException e) {
-      throw new RetriesExhaustedException("Can't get the location", e);
+      throw new RetriesExhaustedException("Can't get the location for replica 
" + replicaId, e);
     }
     if (rl == null) {
-      throw new RetriesExhaustedException("Can't get the locations");
+      throw new RetriesExhaustedException("Can't get the location for replica 
" + replicaId);
     }
 
     return rl;

http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index da685ac..1beb0a9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
@@ -142,10 +143,25 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
     //2. We should close the "losing" scanners (scanners other than the ones 
we hear back
     //   from first)
     //
-    RegionLocations rl = 
RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
-        RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
-        currentScannerCallable.getRow());
-
+    RegionLocations rl = null;
+    try {
+      rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
+          RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
+          currentScannerCallable.getRow());
+    } catch (RetriesExhaustedException | DoNotRetryIOException e) {
+      // We cannot get the primary replica region location, it is possible 
that the region server
+      // hosting meta table is down, it needs to proceed to try cached 
replicas directly.
+      if (cConnection instanceof ConnectionManager.HConnectionImplementation) {
+        rl = ((ConnectionManager.HConnectionImplementation) cConnection)
+            .getCachedLocation(tableName, currentScannerCallable.getRow());
+        if (rl == null) {
+          throw e;
+        }
+      } else {
+        // For completeness
+        throw e;
+      }
+    }
     // allocate a boundedcompletion pool of some multiple of number of 
replicas.
     // We want to accomodate some RPCs for redundant replica scans (but are 
still in progress)
     ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =

http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 45a93a8..d3507c2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
@@ -157,13 +159,40 @@ public class TestReplicaWithCluster {
 
       return null;
     }
+
+    @Override
+    public void postGetClosestRowBefore(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+        final byte [] row, final byte [] family, final Result result)
+        throws IOException {
+
+    }
   }
 
   /**
    * This copro is used to slow down the primary meta region scan a bit
    */
-  public static class RegionServerHostingPrimayMetaRegionSlowCopro extends 
BaseRegionObserver {
+  public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro 
extends BaseRegionObserver {
     static boolean slowDownPrimaryMetaScan = false;
+    static boolean throwException = false;
+
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Get get, final List<Cell> results) throws IOException {
+
+      int replicaId = 
e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
+
+      // Fail for the primary replica, but not for meta
+      if (throwException) {
+        if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && 
(replicaId == 0)) {
+          LOG.info("Get, throw Region Server Stopped Exceptoin for region " + 
e.getEnvironment()
+              .getRegion().getRegionInfo());
+          throw new RegionServerStoppedException("Server " +
+              e.getEnvironment().getRegionServerServices().getServerName() + " 
not running");
+        }
+      } else {
+        LOG.info("Get, We're replica region " + replicaId);
+      }
+    }
 
     @Override
     public RegionScanner preScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> e,
@@ -172,21 +201,34 @@ public class TestReplicaWithCluster {
       int replicaId = 
e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
 
       // Slow down with the primary meta region scan
-      if (slowDownPrimaryMetaScan && 
(e.getEnvironment().getRegion().getRegionInfo().isMetaRegion()
-          && (replicaId == 0))) {
-        LOG.info("Scan with primary meta region, slow down a bit");
-        try {
-          Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
-        } catch (InterruptedException ie) {
-          // Ingore
+      if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && 
(replicaId == 0)) {
+        if (slowDownPrimaryMetaScan) {
+          LOG.info("Scan with primary meta region, slow down a bit");
+          try {
+            Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
+          } catch (InterruptedException ie) {
+            // Ingore
+          }
         }
 
+        // Fail for the primary replica
+        if (throwException) {
+          LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " 
+ e.getEnvironment()
+              .getRegion().getRegionInfo());
+
+          throw new RegionServerStoppedException("Server " +
+              e.getEnvironment().getRegionServerServices().getServerName() + " 
not running");
+        } else {
+          LOG.info("Scan, We're replica region " + replicaId);
+        }
+      } else {
+        LOG.info("Scan, We're replica region " + replicaId);
       }
+
       return null;
     }
   }
 
-
   @BeforeClass
   public static void beforeClass() throws Exception {
     // enable store file refreshing
@@ -213,7 +255,7 @@ public class TestReplicaWithCluster {
 
     // Set system coprocessor so it can be applied to meta regions
     HTU.getConfiguration().set("hbase.coprocessor.region.classes",
-        RegionServerHostingPrimayMetaRegionSlowCopro.class.getName());
+        RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
 
     
HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT,
         META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
@@ -632,14 +674,14 @@ public class TestReplicaWithCluster {
 
       HTU.createTable(hdt, new byte[][] { f }, null);
 
-      RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = 
true;
+      
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = 
true;
 
       // Get user table location, always get it from the primary meta replica
       RegionLocations url = ((ClusterConnection) HTU.getConnection())
           .locateRegion(hdt.getTableName(), row, false, false);
 
     } finally {
-      RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = 
false;
+      
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = 
false;
       ((ConnectionManager.HConnectionImplementation) 
HTU.getHBaseAdmin().getConnection()).
           setUseMetaReplicas(false);
       HTU.getHBaseAdmin().setBalancerRunning(true, true);
@@ -647,4 +689,105 @@ public class TestReplicaWithCluster {
       HTU.deleteTable(hdt.getTableName());
     }
   }
+
+
+  // This test is to simulate the case that the meta region and the primary 
user region
+  // are down, hbase client is able to access user replica regions and return 
stale data.
+  // Meta replica is enabled to show the case that the meta replica region 
could be out of sync
+  // with the primary meta region.
+  @Test
+  public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, 
InterruptedException {
+    HTU.getHBaseAdmin().setBalancerRunning(false, true);
+
+    
((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
+        setUseMetaReplicas(true);
+
+    // Create table then get the single region for our new table.
+    HTableDescriptor hdt = 
HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
+    hdt.setRegionReplication(2);
+    try {
+
+      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+
+      // Get Meta location
+      RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
+          .locateRegion(TableName.META_TABLE_NAME,
+              HConstants.EMPTY_START_ROW, false, false);
+
+      // Get user table location
+      RegionLocations url = ((ClusterConnection) HTU.getConnection())
+          .locateRegion(hdt.getTableName(), row, false, false);
+
+      // Make sure that user primary region is co-hosted with the meta region
+      if (!url.getDefaultRegionLocation().getServerName().equals(
+          mrl.getDefaultRegionLocation().getServerName())) {
+        HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(),
+            mrl.getDefaultRegionLocation().getServerName());
+      }
+
+      // Make sure that the user replica region is not hosted by the same 
region server with
+      // primary
+      if 
(url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation()
+          .getServerName())) {
+        HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(),
+            url.getDefaultRegionLocation().getServerName());
+      }
+
+      // Wait until the meta table is updated with new location info
+      while (true) {
+        mrl = ((ClusterConnection) HTU.getConnection())
+            .locateRegion(TableName.META_TABLE_NAME, 
HConstants.EMPTY_START_ROW, false, false);
+
+        // Get user table location
+        url = ((ClusterConnection) HTU.getConnection())
+            .locateRegion(hdt.getTableName(), row, false, true);
+
+        LOG.info("meta locations " + mrl);
+        LOG.info("table locations " + url);
+        ServerName a = url.getDefaultRegionLocation().getServerName();
+        ServerName b = mrl.getDefaultRegionLocation().getServerName();
+        if(a.equals(b)) {
+          break;
+        } else {
+          LOG.info("Waiting for new region info to be updated in meta table");
+          Thread.sleep(100);
+        }
+      }
+
+      Put p = new Put(row);
+      p.addColumn(f, row, row);
+      table.put(p);
+
+      // Flush so it can be picked by the replica refresher thread
+      HTU.flush(table.getName());
+
+      // Sleep for some time until data is picked up by replicas
+      try {
+        Thread.sleep(2 * REFRESH_PERIOD);
+      } catch (InterruptedException e1) {
+        LOG.error(e1);
+      }
+
+      // Simulating the RS down
+      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true;
+
+      // The first Get is supposed to succeed
+      Get g = new Get(row);
+      g.setConsistency(Consistency.TIMELINE);
+      Result r = table.get(g);
+      Assert.assertTrue(r.isStale());
+
+      // The second Get will succeed as well
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+
+    } finally {
+      
((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()).
+          setUseMetaReplicas(false);
+      RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = 
false;
+      HTU.getHBaseAdmin().setBalancerRunning(true, true);
+      HTU.getHBaseAdmin().disableTable(hdt.getTableName());
+      HTU.deleteTable(hdt.getTableName());
+    }
+  }
 }

Reply via email to