Repository: hbase Updated Branches: refs/heads/branch-1 926aaed11 -> b8e969be7
HBASE-12891 Parallel execution for Hbck checkRegionConsistency Signed-off-by: Enis Soztutar <e...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b8e969be Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b8e969be Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b8e969be Branch: refs/heads/branch-1 Commit: b8e969be7a17d6637c2e0c48473a4b78ff795977 Parents: 926aaed Author: Dave Latham <davelat...@yahoo-inc.com> Authored: Thu Apr 2 08:32:15 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Thu Apr 2 11:24:14 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 92 +++++++++++++++++--- 1 file changed, 78 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b8e969be/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index dcbb8f1..65d4a42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -56,8 +56,10 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; import com.google.common.collect.TreeMultimap; import com.google.protobuf.ServiceException; @@ -1767,23 +1769,75 @@ public class HBaseFsck extends Configured implements Closeable { throws IOException, KeeperException, InterruptedException { // Divide the checks in two phases. One for default/primary replicas and another // for the non-primary ones. Keeps code cleaner this way. + List<CheckRegionConsistencyWorkItem> workItems = + new ArrayList<CheckRegionConsistencyWorkItem>(regionInfoMap.size()); for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) { if (e.getValue().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) { - checkRegionConsistency(e.getKey(), e.getValue()); + workItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue())); } } + checkRegionConsistencyConcurrently(workItems); + boolean prevHdfsCheck = shouldCheckHdfs(); setCheckHdfs(false); //replicas don't have any hdfs data // Run a pass over the replicas and fix any assignment issues that exist on the currently // deployed/undeployed replicas. + List<CheckRegionConsistencyWorkItem> replicaWorkItems = + new ArrayList<CheckRegionConsistencyWorkItem>(regionInfoMap.size()); for (java.util.Map.Entry<String, HbckInfo> e: regionInfoMap.entrySet()) { if (e.getValue().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { - checkRegionConsistency(e.getKey(), e.getValue()); + replicaWorkItems.add(new CheckRegionConsistencyWorkItem(e.getKey(), e.getValue())); } } + checkRegionConsistencyConcurrently(replicaWorkItems); setCheckHdfs(prevHdfsCheck); } + /** + * Check consistency of all regions using mulitple threads concurrently. + */ + private void checkRegionConsistencyConcurrently( + final List<CheckRegionConsistencyWorkItem> workItems) + throws IOException, KeeperException, InterruptedException { + if (workItems.isEmpty()) { + return; // nothing to check + } + + List<Future<Void>> workFutures = executor.invokeAll(workItems); + for(Future<Void> f: workFutures) { + try { + f.get(); + } catch(ExecutionException e1) { + LOG.warn("Could not check region consistency " , e1.getCause()); + if (e1.getCause() instanceof IOException) { + throw (IOException)e1.getCause(); + } else if (e1.getCause() instanceof KeeperException) { + throw (KeeperException)e1.getCause(); + } else if (e1.getCause() instanceof InterruptedException) { + throw (InterruptedException)e1.getCause(); + } else { + throw new IOException(e1.getCause()); + } + } + } + } + + class CheckRegionConsistencyWorkItem implements Callable<Void> { + private final String key; + private final HbckInfo hbi; + + CheckRegionConsistencyWorkItem(String key, HbckInfo hbi) { + this.key = key; + this.hbi = hbi; + } + + @Override + public synchronized Void call() throws Exception { + checkRegionConsistency(key, hbi); + return null; + } + } + private void preCheckPermission() throws IOException, AccessDeniedException { if (shouldIgnorePreCheckPermission()) { return; @@ -2079,16 +2133,8 @@ public class HBaseFsck extends Configured implements Closeable { HRegionInfo hri = hbi.getHdfsHRI(); TableInfo tableInfo = tablesInfo.get(hri.getTable()); - if (tableInfo.regionsFromMeta.isEmpty()) { - for (HbckInfo h : regionInfoMap.values()) { - if (hri.getTable().equals(h.getTableName())) { - if (h.metaEntry != null) tableInfo.regionsFromMeta - .add((HRegionInfo) h.metaEntry); - } - } - Collections.sort(tableInfo.regionsFromMeta); - } - for (HRegionInfo region : tableInfo.regionsFromMeta) { + + for (HRegionInfo region : tableInfo.getRegionsFromMeta()) { if (Bytes.compareTo(region.getStartKey(), hri.getStartKey()) <= 0 && (region.getEndKey().length == 0 || Bytes.compareTo(region.getEndKey(), hri.getEndKey()) >= 0) @@ -2445,7 +2491,7 @@ public class HBaseFsck extends Configured implements Closeable { TreeMultimap.create(RegionSplitCalculator.BYTES_COMPARATOR, cmp); // list of regions derived from meta entries. - final List<HRegionInfo> regionsFromMeta = new ArrayList<HRegionInfo>(); + private ImmutableList<HRegionInfo> regionsFromMeta = null; TableInfo(TableName name) { this.tableName = name; @@ -2502,7 +2548,25 @@ public class HBaseFsck extends Configured implements Closeable { return sc.getStarts().size() + backwards.size(); } - private class IntegrityFixSuggester extends TableIntegrityErrorHandlerImpl { + public synchronized ImmutableList<HRegionInfo> getRegionsFromMeta() { + // lazy loaded, synchronized to ensure a single load + if (regionsFromMeta == null) { + List<HRegionInfo> regions = new ArrayList<HRegionInfo>(); + for (HbckInfo h : HBaseFsck.this.regionInfoMap.values()) { + if (tableName.equals(h.getTableName())) { + if (h.metaEntry != null) { + regions.add((HRegionInfo) h.metaEntry); + } + } + } + regionsFromMeta = Ordering.natural().immutableSortedCopy(regions); + } + + return regionsFromMeta; + } + + + private class IntegrityFixSuggester extends TableIntegrityErrorHandlerImpl { ErrorReporter errors; IntegrityFixSuggester(TableInfo ti, ErrorReporter errors) {