Repository: hbase Updated Branches: refs/heads/branch-1 69d3e332f -> 39e8e2fb5
HBASE-18005 read replica: handle the case that region server hosting both primary replica and meta region is down (huaxiang sun) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39e8e2fb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39e8e2fb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39e8e2fb Branch: refs/heads/branch-1 Commit: 39e8e2fb5898847ef41068cdf46a660e66389541 Parents: 69d3e33 Author: tedyu <yuzhih...@gmail.com> Authored: Tue Jun 6 09:07:17 2017 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Tue Jun 6 09:07:17 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionManager.java | 3 +- .../apache/hadoop/hbase/client/MetaCache.java | 34 ++++ .../RpcRetryingCallerWithReadReplicas.java | 82 ++++++--- .../client/ScannerCallableWithReplicas.java | 24 ++- .../hbase/client/TestReplicaWithCluster.java | 167 +++++++++++++++++-- 5 files changed, 269 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index aa44070..5e670a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -1296,7 +1296,8 @@ class ConnectionManager { } else { // If we are not supposed to be using the cache, delete any existing cached location // so it won't interfere. - metaCache.clearCache(tableName, row); + // We are only supposed to clean the cache for the specific replicaId + metaCache.clearCache(tableName, row, replicaId); } // Query the meta region http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index 95b5950..2a172e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -319,6 +319,40 @@ public class MetaCache { } /** + * Delete a cached location with specific replicaId. + * @param tableName tableName + * @param row row key + * @param replicaId region replica id + */ + public void clearCache(final TableName tableName, final byte [] row, int replicaId) { + ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName); + + RegionLocations regionLocations = getCachedLocation(tableName, row); + if (regionLocations != null) { + HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId); + if (toBeRemoved != null) { + RegionLocations updatedLocations = regionLocations.remove(replicaId); + byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey(); + boolean removed; + if (updatedLocations.isEmpty()) { + removed = tableLocations.remove(startKey, regionLocations); + } else { + removed = tableLocations.replace(startKey, regionLocations, updatedLocations); + } + + if (removed) { + if (metrics != null) { + metrics.incrMetaCacheNumClearRegion(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Removed " + toBeRemoved + " from cache"); + } + } + } + } + } + + /** * Delete a cached location for a table, row and server */ public void clearCache(final TableName tableName, final byte [] row, ServerName serverName) { http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 8c5efde..e6954cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -195,10 +195,39 @@ public class RpcRetryingCallerWithReadReplicas { throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException { boolean isTargetReplicaSpecified = (get.getReplicaId() >= 0); - RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId() - : RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow()); - ResultBoundedCompletionService<Result> cs = - new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size()); + RegionLocations rl = null; + boolean skipPrimary = false; + try { + rl = getRegionLocations(true, + (isTargetReplicaSpecified ? get.getReplicaId() : RegionReplicaUtil.DEFAULT_REPLICA_ID), + cConnection, tableName, get.getRow()); + } catch (RetriesExhaustedException | DoNotRetryIOException e) { + // When there is no specific replica id specified. It just needs to load all replicas. + if (isTargetReplicaSpecified) { + throw e; + } else { + // We cannot get the primary replica location, it is possible that the region + // server hosting meta is down, it needs to proceed to try cached replicas. + if (cConnection instanceof ConnectionManager.HConnectionImplementation) { + rl = ((ConnectionManager.HConnectionImplementation)cConnection).getCachedLocation( + tableName, get.getRow()); + if (rl == null) { + // No cached locations + throw e; + } + + // Primary replica location is not known, skip primary replica + skipPrimary = true; + } else { + // For completeness + throw e; + } + } + } + + final ResultBoundedCompletionService<Result> cs = + new ResultBoundedCompletionService<>(this.rpcRetryingCallerFactory, pool, rl.size()); + int startIndex = 0; int endIndex = rl.size(); @@ -206,25 +235,30 @@ public class RpcRetryingCallerWithReadReplicas { addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId()); endIndex = 1; } else { - addCallsForReplica(cs, rl, 0, 0); - try { - // wait for the timeout to see whether the primary responds back - Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds - if (f != null) { - return f.get(); //great we got a response - } - } catch (ExecutionException e) { - // We ignore the ExecutionException and continue with the secondary replicas - if (LOG.isDebugEnabled()) { - LOG.debug("Primary replica returns " + e.getCause()); + if (!skipPrimary) { + addCallsForReplica(cs, rl, 0, 0); + try { + // wait for the timeout to see whether the primary responds back + Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds + if (f != null) { + return f.get(); //great we got a response + } + } catch (ExecutionException e) { + // We ignore the ExecutionException and continue with the secondary replicas + if (LOG.isDebugEnabled()) { + LOG.debug("Primary replica returns " + e.getCause()); + } + + // Skip the result from the primary as we know that there is something wrong + startIndex = 1; + } catch (CancellationException e) { + throw new InterruptedIOException(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); } - - // Skip the result from the primary as we know that there is something wrong - startIndex = 1; - } catch (CancellationException e) { - throw new InterruptedIOException(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); + } else { + // Since primary replica is skipped, the endIndex needs to be adjusted accordingly + endIndex --; } // submit call for the all of the secondaries at once @@ -324,10 +358,10 @@ public class RpcRetryingCallerWithReadReplicas { } catch (InterruptedIOException e) { throw e; } catch (IOException e) { - throw new RetriesExhaustedException("Can't get the location", e); + throw new RetriesExhaustedException("Can't get the location for replica " + replicaId, e); } if (rl == null) { - throw new RetriesExhaustedException("Can't get the locations"); + throw new RetriesExhaustedException("Can't get the location for replica " + replicaId); } return rl; http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index da685ac..1beb0a9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; @@ -142,10 +143,25 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { //2. We should close the "losing" scanners (scanners other than the ones we hear back // from first) // - RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, - RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, - currentScannerCallable.getRow()); - + RegionLocations rl = null; + try { + rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, + RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, + currentScannerCallable.getRow()); + } catch (RetriesExhaustedException | DoNotRetryIOException e) { + // We cannot get the primary replica region location, it is possible that the region server + // hosting meta table is down, it needs to proceed to try cached replicas directly. + if (cConnection instanceof ConnectionManager.HConnectionImplementation) { + rl = ((ConnectionManager.HConnectionImplementation) cConnection) + .getCachedLocation(tableName, currentScannerCallable.getRow()); + if (rl == null) { + throw e; + } + } else { + // For completeness + throw e; + } + } // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = http://git-wip-us.apache.org/repos/asf/hbase/blob/39e8e2fb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 45a93a8..d3507c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; @@ -157,13 +159,40 @@ public class TestReplicaWithCluster { return null; } + + @Override + public void postGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c, + final byte [] row, final byte [] family, final Result result) + throws IOException { + + } } /** * This copro is used to slow down the primary meta region scan a bit */ - public static class RegionServerHostingPrimayMetaRegionSlowCopro extends BaseRegionObserver { + public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro extends BaseRegionObserver { static boolean slowDownPrimaryMetaScan = false; + static boolean throwException = false; + + @Override + public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, + final Get get, final List<Cell> results) throws IOException { + + int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); + + // Fail for the primary replica, but not for meta + if (throwException) { + if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { + LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment() + .getRegion().getRegionInfo()); + throw new RegionServerStoppedException("Server " + + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); + } + } else { + LOG.info("Get, We're replica region " + replicaId); + } + } @Override public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, @@ -172,21 +201,34 @@ public class TestReplicaWithCluster { int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId(); // Slow down with the primary meta region scan - if (slowDownPrimaryMetaScan && (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() - && (replicaId == 0))) { - LOG.info("Scan with primary meta region, slow down a bit"); - try { - Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); - } catch (InterruptedException ie) { - // Ingore + if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) { + if (slowDownPrimaryMetaScan) { + LOG.info("Scan with primary meta region, slow down a bit"); + try { + Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50); + } catch (InterruptedException ie) { + // Ingore + } } + // Fail for the primary replica + if (throwException) { + LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment() + .getRegion().getRegionInfo()); + + throw new RegionServerStoppedException("Server " + + e.getEnvironment().getRegionServerServices().getServerName() + " not running"); + } else { + LOG.info("Scan, We're replica region " + replicaId); + } + } else { + LOG.info("Scan, We're replica region " + replicaId); } + return null; } } - @BeforeClass public static void beforeClass() throws Exception { // enable store file refreshing @@ -213,7 +255,7 @@ public class TestReplicaWithCluster { // Set system coprocessor so it can be applied to meta regions HTU.getConfiguration().set("hbase.coprocessor.region.classes", - RegionServerHostingPrimayMetaRegionSlowCopro.class.getName()); + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName()); HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_MEAT_REPLICA_SCAN_TIMEOUT, META_SCAN_TIMEOUT_IN_MILLISEC * 1000); @@ -632,14 +674,14 @@ public class TestReplicaWithCluster { HTU.createTable(hdt, new byte[][] { f }, null); - RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = true; + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true; // Get user table location, always get it from the primary meta replica RegionLocations url = ((ClusterConnection) HTU.getConnection()) .locateRegion(hdt.getTableName(), row, false, false); } finally { - RegionServerHostingPrimayMetaRegionSlowCopro.slowDownPrimaryMetaScan = false; + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false; ((ConnectionManager.HConnectionImplementation) HTU.getHBaseAdmin().getConnection()). setUseMetaReplicas(false); HTU.getHBaseAdmin().setBalancerRunning(true, true); @@ -647,4 +689,105 @@ public class TestReplicaWithCluster { HTU.deleteTable(hdt.getTableName()); } } + + + // This test is to simulate the case that the meta region and the primary user region + // are down, hbase client is able to access user replica regions and return stale data. + // Meta replica is enabled to show the case that the meta replica region could be out of sync + // with the primary meta region. + @Test + public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException { + HTU.getHBaseAdmin().setBalancerRunning(false, true); + + ((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()). + setUseMetaReplicas(true); + + // Create table then get the single region for our new table. + HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown"); + hdt.setRegionReplication(2); + try { + + Table table = HTU.createTable(hdt, new byte[][] { f }, null); + + // Get Meta location + RegionLocations mrl = ((ClusterConnection) HTU.getConnection()) + .locateRegion(TableName.META_TABLE_NAME, + HConstants.EMPTY_START_ROW, false, false); + + // Get user table location + RegionLocations url = ((ClusterConnection) HTU.getConnection()) + .locateRegion(hdt.getTableName(), row, false, false); + + // Make sure that user primary region is co-hosted with the meta region + if (!url.getDefaultRegionLocation().getServerName().equals( + mrl.getDefaultRegionLocation().getServerName())) { + HTU.moveRegionAndWait(url.getDefaultRegionLocation().getRegionInfo(), + mrl.getDefaultRegionLocation().getServerName()); + } + + // Make sure that the user replica region is not hosted by the same region server with + // primary + if (url.getRegionLocation(1).getServerName().equals(mrl.getDefaultRegionLocation() + .getServerName())) { + HTU.moveRegionAndWait(url.getRegionLocation(1).getRegionInfo(), + url.getDefaultRegionLocation().getServerName()); + } + + // Wait until the meta table is updated with new location info + while (true) { + mrl = ((ClusterConnection) HTU.getConnection()) + .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false); + + // Get user table location + url = ((ClusterConnection) HTU.getConnection()) + .locateRegion(hdt.getTableName(), row, false, true); + + LOG.info("meta locations " + mrl); + LOG.info("table locations " + url); + ServerName a = url.getDefaultRegionLocation().getServerName(); + ServerName b = mrl.getDefaultRegionLocation().getServerName(); + if(a.equals(b)) { + break; + } else { + LOG.info("Waiting for new region info to be updated in meta table"); + Thread.sleep(100); + } + } + + Put p = new Put(row); + p.addColumn(f, row, row); + table.put(p); + + // Flush so it can be picked by the replica refresher thread + HTU.flush(table.getName()); + + // Sleep for some time until data is picked up by replicas + try { + Thread.sleep(2 * REFRESH_PERIOD); + } catch (InterruptedException e1) { + LOG.error(e1); + } + + // Simulating the RS down + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = true; + + // The first Get is supposed to succeed + Get g = new Get(row); + g.setConsistency(Consistency.TIMELINE); + Result r = table.get(g); + Assert.assertTrue(r.isStale()); + + // The second Get will succeed as well + r = table.get(g); + Assert.assertTrue(r.isStale()); + + } finally { + ((ConnectionManager.HConnectionImplementation)HTU.getHBaseAdmin().getConnection()). + setUseMetaReplicas(false); + RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false; + HTU.getHBaseAdmin().setBalancerRunning(true, true); + HTU.getHBaseAdmin().disableTable(hdt.getTableName()); + HTU.deleteTable(hdt.getTableName()); + } + } }