http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java new file mode 100644 index 0000000..929cd4e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java @@ -0,0 +1,122 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Executors; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Server; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Base class used bulk assigning and unassigning regions. + * Encapsulates a fixed size thread pool of executors to run assignment/unassignment. + * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and + * {@link #waitUntilDone(long)}. The default implementation of + * the {@link #getUncaughtExceptionHandler()} is to abort the hosting + * Server. + */ [email protected] +public abstract class BulkAssigner { + protected final Server server; + + /** + * @param server An instance of Server + */ + public BulkAssigner(final Server server) { + this.server = server; + } + + /** + * @return What to use for a thread prefix when executor runs. + */ + protected String getThreadNamePrefix() { + return this.server.getServerName() + "-" + this.getClass().getName(); + } + + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + // Abort if exception of any kind. + server.abort("Uncaught exception in " + t.getName(), e); + } + }; + } + + protected int getThreadCount() { + return this.server.getConfiguration(). + getInt("hbase.bulk.assignment.threadpool.size", 20); + } + + protected long getTimeoutOnRIT() { + return this.server.getConfiguration(). + getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000); + } + + protected abstract void populatePool( + final java.util.concurrent.ExecutorService pool) throws IOException; + + public boolean bulkAssign() throws InterruptedException, IOException { + return bulkAssign(true); + } + + /** + * Run the bulk assign. + * + * @param sync + * Whether to assign synchronously. + * @throws InterruptedException + * @return True if done. + * @throws IOException + */ + public boolean bulkAssign(boolean sync) throws InterruptedException, + IOException { + boolean result = false; + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setDaemon(true); + builder.setNameFormat(getThreadNamePrefix() + "-%1$d"); + builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); + int threadCount = getThreadCount(); + java.util.concurrent.ExecutorService pool = + Executors.newFixedThreadPool(threadCount, builder.build()); + try { + populatePool(pool); + // How long to wait on empty regions-in-transition. If we timeout, the + // RIT monitor should do fixup. + if (sync) result = waitUntilDone(getTimeoutOnRIT()); + } finally { + // We're done with the pool. It'll exit when its done all in queue. + pool.shutdown(); + } + return result; + } + + /** + * Wait until bulk assign is done. + * @param timeout How long to wait. + * @throws InterruptedException + * @return True if the condition we were waiting on happened. + */ + protected abstract boolean waitUntilDone(final long timeout) + throws InterruptedException; +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java new file mode 100644 index 0000000..d8c511e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java @@ -0,0 +1,136 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; + +/** + * Performs bulk reopen of the list of regions provided to it. + */ [email protected] +public class BulkReOpen extends BulkAssigner { + private final Map<ServerName, List<HRegionInfo>> rsToRegions; + private final AssignmentManager assignmentManager; + private static final Log LOG = LogFactory.getLog(BulkReOpen.class); + + public BulkReOpen(final Server server, + final Map<ServerName, List<HRegionInfo>> serverToRegions, + final AssignmentManager am) { + super(server); + this.assignmentManager = am; + this.rsToRegions = serverToRegions; + } + + /** + * Unassign all regions, so that they go through the regular region + * assignment flow (in assignment manager) and are re-opened. + */ + @Override + protected void populatePool(ExecutorService pool) { + LOG.debug("Creating threads for each region server "); + for (Map.Entry<ServerName, List<HRegionInfo>> e : rsToRegions + .entrySet()) { + final List<HRegionInfo> hris = e.getValue(); + // add plans for the regions that need to be reopened + Map<String, RegionPlan> plans = new HashMap<>(); + for (HRegionInfo hri : hris) { + RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri); + plans.put(hri.getEncodedName(), reOpenPlan); + } + assignmentManager.addPlans(plans); + pool.execute(new Runnable() { + public void run() { + try { + unassign(hris); + } catch (Throwable t) { + LOG.warn("Failed bulking re-open " + hris.size() + + " region(s)", t); + } + } + }); + } + } + + /** + * Reopen the regions asynchronously, so always returns true immediately. + * @return true + */ + @Override + protected boolean waitUntilDone(long timeout) { + return true; + } + + /** + * Configuration knobs "hbase.bulk.reopen.threadpool.size" number of regions + * that can be reopened concurrently. The maximum number of threads the master + * creates is never more than the number of region servers. + * If configuration is not defined it defaults to 20 + */ + protected int getThreadCount() { + int defaultThreadCount = super.getThreadCount(); + return this.server.getConfiguration().getInt( + "hbase.bulk.reopen.threadpool.size", defaultThreadCount); + } + + public boolean bulkReOpen() throws InterruptedException, IOException { + return bulkAssign(); + } + + /** + * Unassign the list of regions. Configuration knobs: + * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to + * wait before unassigning another region from this region server + * + * @param regions + * @throws InterruptedException + */ + private void unassign( + List<HRegionInfo> regions) throws InterruptedException { + int waitTime = this.server.getConfiguration().getInt( + "hbase.bulk.waitbetween.reopen", 0); + RegionStates regionStates = assignmentManager.getRegionStates(); + for (HRegionInfo region : regions) { + if (server.isStopped()) { + return; + } + if (regionStates.isRegionInTransition(region)) { + continue; + } + assignmentManager.unassign(region); + while (regionStates.isRegionInTransition(region) + && !server.isStopped()) { + regionStates.waitForUpdate(100); + } + if (waitTime > 0 && !server.isStopped()) { + Thread.sleep(waitTime); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 4775a0a..affd44c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -27,6 +27,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -38,15 +39,11 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.assignment.GCMergedRegionsProcedure; -import org.apache.hadoop.hbase.master.assignment.GCRegionProcedure; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -55,8 +52,6 @@ import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Triple; -import com.google.common.annotations.VisibleForTesting; - /** * A janitor for the catalog tables. Scans the <code>hbase:meta</code> catalog * table on a period looking for unused regions to garbage collect. @@ -69,7 +64,6 @@ public class CatalogJanitor extends ScheduledChore { private final AtomicBoolean enabled = new AtomicBoolean(true); private final MasterServices services; private final Connection connection; - // PID of the last Procedure launched herein. Keep around for Tests. CatalogJanitor(final MasterServices services) { super("CatalogJanitor-" + services.getServerName().toShortString(), services, @@ -118,13 +112,10 @@ public class CatalogJanitor extends ScheduledChore { && !this.services.isInMaintenanceMode() && am != null && am.isFailoverCleanupDone() - && !am.hasRegionsInTransition()) { + && am.getRegionStates().getRegionsInTransition().isEmpty()) { scan(); } else { - LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() + - ", maintenanceMode=" + this.services.isInMaintenanceMode() + - ", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) + - ", hasRIT=" + (am != null && am.hasRegionsInTransition())); + LOG.warn("CatalogJanitor disabled! Not running scan."); } } catch (IOException e) { LOG.warn("Failed scan of catalog table", e); @@ -176,7 +167,6 @@ public class CatalogJanitor extends ScheduledChore { // Another table, stop scanning return false; } - if (LOG.isTraceEnabled()) LOG.trace("" + info + " IS-SPLIT_PARENT=" + info.isSplitParent()); if (info.isSplitParent()) splitParents.put(info, r); if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) { mergedRegions.put(info, r); @@ -197,6 +187,8 @@ public class CatalogJanitor extends ScheduledChore { * If merged region no longer holds reference to the merge regions, archive * merge region on hdfs and perform deleting references in hbase:meta * @param mergedRegion + * @param regionA + * @param regionB * @return true if we delete references in merged region on hbase:meta and archive * the files on the file system * @throws IOException @@ -215,12 +207,18 @@ public class CatalogJanitor extends ScheduledChore { LOG.warn("Merged region does not exist: " + mergedRegion.getEncodedName()); } if (regionFs == null || !regionFs.hasReferences(htd)) { - LOG.debug("Deleting region " + regionA.getShortNameToLog() + " and " - + regionB.getShortNameToLog() + LOG.debug("Deleting region " + regionA.getRegionNameAsString() + " and " + + regionB.getRegionNameAsString() + " from fs because merged region no longer holds references"); - ProcedureExecutor<MasterProcedureEnv> pe = this.services.getMasterProcedureExecutor(); - pe.submitProcedure(new GCMergedRegionsProcedure(pe.getEnvironment(), - mergedRegion, regionA, regionB)); + HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionA); + HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, regionB); + MetaTableAccessor.deleteMergeQualifiers(services.getConnection(), mergedRegion); + services.getServerManager().removeRegion(regionA); + services.getServerManager().removeRegion(regionB); + FavoredNodesManager fnm = this.services.getFavoredNodesManager(); + if (fnm != null) { + fnm.deleteFavoredNodesForRegions(Lists.newArrayList(regionA, regionB)); + } return true; } return false; @@ -229,21 +227,22 @@ public class CatalogJanitor extends ScheduledChore { /** * Run janitorial scan of catalog <code>hbase:meta</code> table looking for * garbage to collect. - * @return number of archiving jobs started. + * @return number of cleaned regions * @throws IOException */ int scan() throws IOException { - int result = 0; try { if (!alreadyRunning.compareAndSet(false, true)) { LOG.debug("CatalogJanitor already running"); - return result; + return 0; } Triple<Integer, Map<HRegionInfo, Result>, Map<HRegionInfo, Result>> scanTriple = getMergedRegionsAndSplitParents(); + int count = scanTriple.getFirst(); /** * clean merge regions first */ + int mergeCleaned = 0; Map<HRegionInfo, Result> mergedRegions = scanTriple.getSecond(); for (Map.Entry<HRegionInfo, Result> e : mergedRegions.entrySet()) { if (this.services.isInMaintenanceMode()) { @@ -256,13 +255,13 @@ public class CatalogJanitor extends ScheduledChore { HRegionInfo regionB = p.getSecond(); if (regionA == null || regionB == null) { LOG.warn("Unexpected references regionA=" - + (regionA == null ? "null" : regionA.getShortNameToLog()) + + (regionA == null ? "null" : regionA.getRegionNameAsString()) + ",regionB=" - + (regionB == null ? "null" : regionB.getShortNameToLog()) - + " in merged region " + e.getKey().getShortNameToLog()); + + (regionB == null ? "null" : regionB.getRegionNameAsString()) + + " in merged region " + e.getKey().getRegionNameAsString()); } else { if (cleanMergeRegion(e.getKey(), regionA, regionB)) { - result++; + mergeCleaned++; } } } @@ -272,6 +271,7 @@ public class CatalogJanitor extends ScheduledChore { Map<HRegionInfo, Result> splitParents = scanTriple.getThird(); // Now work on our list of found parents. See if any we can clean up. + int splitCleaned = 0; // regions whose parents are still around HashSet<String> parentNotCleaned = new HashSet<>(); for (Map.Entry<HRegionInfo, Result> e : splitParents.entrySet()) { @@ -281,8 +281,8 @@ public class CatalogJanitor extends ScheduledChore { } if (!parentNotCleaned.contains(e.getKey().getEncodedName()) && - cleanParent(e.getKey(), e.getValue())) { - result++; + cleanParent(e.getKey(), e.getValue())) { + splitCleaned++; } else { // We could not clean the parent, so it's daughters should not be // cleaned either (HBASE-6160) @@ -292,7 +292,16 @@ public class CatalogJanitor extends ScheduledChore { parentNotCleaned.add(daughters.getSecond().getEncodedName()); } } - return result; + if ((mergeCleaned + splitCleaned) != 0) { + LOG.info("Scanned " + count + " catalog row(s), gc'd " + mergeCleaned + + " unreferenced merged region(s) and " + splitCleaned + + " unreferenced parent region(s)"); + } else if (LOG.isTraceEnabled()) { + LOG.trace("Scanned " + count + " catalog row(s), gc'd " + mergeCleaned + + " unreferenced merged region(s) and " + splitCleaned + + " unreferenced parent region(s)"); + } + return mergeCleaned + splitCleaned; } finally { alreadyRunning.set(false); } @@ -334,30 +343,34 @@ public class CatalogJanitor extends ScheduledChore { */ boolean cleanParent(final HRegionInfo parent, Result rowContent) throws IOException { + boolean result = false; // Check whether it is a merged region and not clean reference // No necessary to check MERGEB_QUALIFIER because these two qualifiers will // be inserted/deleted together - if (rowContent.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) { + if (rowContent.getValue(HConstants.CATALOG_FAMILY, + HConstants.MERGEA_QUALIFIER) != null) { // wait cleaning merge region first - return false; + return result; } // Run checks on each daughter split. PairOfSameType<HRegionInfo> daughters = MetaTableAccessor.getDaughterRegions(rowContent); Pair<Boolean, Boolean> a = checkDaughterInFs(parent, daughters.getFirst()); Pair<Boolean, Boolean> b = checkDaughterInFs(parent, daughters.getSecond()); if (hasNoReferences(a) && hasNoReferences(b)) { - String daughterA = daughters.getFirst() != null? - daughters.getFirst().getShortNameToLog(): "null"; - String daughterB = daughters.getSecond() != null? - daughters.getSecond().getShortNameToLog(): "null"; - LOG.debug("Deleting region " + parent.getShortNameToLog() + - " because daughters -- " + daughterA + ", " + daughterB + - " -- no longer hold references"); - ProcedureExecutor<MasterProcedureEnv> pe = this.services.getMasterProcedureExecutor(); - pe.submitProcedure(new GCRegionProcedure(pe.getEnvironment(), parent)); - return true; + LOG.debug("Deleting region " + parent.getRegionNameAsString() + + " because daughter splits no longer hold references"); + FileSystem fs = this.services.getMasterFileSystem().getFileSystem(); + if (LOG.isTraceEnabled()) LOG.trace("Archiving parent region: " + parent); + HFileArchiver.archiveRegion(this.services.getConfiguration(), fs, parent); + MetaTableAccessor.deleteRegion(this.connection, parent); + services.getServerManager().removeRegion(parent); + FavoredNodesManager fnm = this.services.getFavoredNodesManager(); + if (fnm != null) { + fnm.deleteFavoredNodesForRegions(Lists.newArrayList(parent)); + } + result = true; } - return false; + return result; } /** @@ -456,4 +469,4 @@ public class CatalogJanitor extends ScheduledChore { return cleanMergeRegion(region, mergeRegions.getFirst(), mergeRegions.getSecond()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java index 34a7633..faceba2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java @@ -61,7 +61,7 @@ public class DeadServer { /** * Whether a dead server is being processed currently. */ - private volatile boolean processing = false; + private boolean processing = false; /** * A dead server that comes back alive has a different start code. The new start code should be @@ -123,14 +123,14 @@ public class DeadServer { * @param sn ServerName for the dead server. */ public synchronized void notifyServer(ServerName sn) { - if (LOG.isTraceEnabled()) { LOG.trace("Started processing " + sn); } + if (LOG.isDebugEnabled()) { LOG.debug("Started processing " + sn); } processing = true; numProcessing++; } public synchronized void finish(ServerName sn) { numProcessing--; - if (LOG.isTraceEnabled()) LOG.trace("Finished " + sn + "; numProcessing=" + numProcessing); + if (LOG.isDebugEnabled()) LOG.debug("Finished " + sn + "; numProcessing=" + numProcessing); assert numProcessing >= 0: "Number of dead servers in processing should always be non-negative"; http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java new file mode 100644 index 0000000..fc3607f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; + +/** + * Run bulk assign. Does one RCP per regionserver passing a + * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}. + */ [email protected] +public class GeneralBulkAssigner extends BulkAssigner { + private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class); + + private Map<ServerName, List<HRegionInfo>> failedPlans = new ConcurrentHashMap<>(); + private ExecutorService pool; + + final Map<ServerName, List<HRegionInfo>> bulkPlan; + final AssignmentManager assignmentManager; + final boolean waitTillAllAssigned; + + public GeneralBulkAssigner(final Server server, + final Map<ServerName, List<HRegionInfo>> bulkPlan, + final AssignmentManager am, final boolean waitTillAllAssigned) { + super(server); + this.bulkPlan = bulkPlan; + this.assignmentManager = am; + this.waitTillAllAssigned = waitTillAllAssigned; + } + + @Override + protected String getThreadNamePrefix() { + return this.server.getServerName() + "-GeneralBulkAssigner"; + } + + @Override + protected void populatePool(ExecutorService pool) { + this.pool = pool; // shut it down later in case some assigner hangs + for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) { + pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(), + this.assignmentManager, this.failedPlans)); + } + } + + /** + * + * @param timeout How long to wait. + * @return true if done. + */ + @Override + protected boolean waitUntilDone(final long timeout) + throws InterruptedException { + Set<HRegionInfo> regionSet = new HashSet<>(); + for (List<HRegionInfo> regionList : bulkPlan.values()) { + regionSet.addAll(regionList); + } + + pool.shutdown(); // no more task allowed + int serverCount = bulkPlan.size(); + int regionCount = regionSet.size(); + long startTime = System.currentTimeMillis(); + long rpcWaitTime = startTime + timeout; + while (!server.isStopped() && !pool.isTerminated() + && rpcWaitTime > System.currentTimeMillis()) { + if (failedPlans.isEmpty()) { + pool.awaitTermination(100, TimeUnit.MILLISECONDS); + } else { + reassignFailedPlans(); + } + } + if (!pool.isTerminated()) { + LOG.warn("bulk assigner is still running after " + + (System.currentTimeMillis() - startTime) + "ms, shut it down now"); + // some assigner hangs, can't wait any more, shutdown the pool now + List<Runnable> notStarted = pool.shutdownNow(); + if (notStarted != null && !notStarted.isEmpty()) { + server.abort("some single server assigner hasn't started yet" + + " when the bulk assigner timed out", null); + return false; + } + } + + int reassigningRegions = 0; + if (!failedPlans.isEmpty() && !server.isStopped()) { + reassigningRegions = reassignFailedPlans(); + } + assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned, + reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime)); + + if (LOG.isDebugEnabled()) { + long elapsedTime = System.currentTimeMillis() - startTime; + String status = "successfully"; + if (!regionSet.isEmpty()) { + status = "with " + regionSet.size() + " regions still in transition"; + } + LOG.debug("bulk assigning total " + regionCount + " regions to " + + serverCount + " servers, took " + elapsedTime + "ms, " + status); + } + return regionSet.isEmpty(); + } + + @Override + protected long getTimeoutOnRIT() { + // Guess timeout. Multiply the max number of regions on a server + // by how long we think one region takes opening. + Configuration conf = server.getConfiguration(); + long perRegionOpenTimeGuesstimate = + conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); + int maxRegionsPerServer = 1; + for (List<HRegionInfo> regionList : bulkPlan.values()) { + int size = regionList.size(); + if (size > maxRegionsPerServer) { + maxRegionsPerServer = size; + } + } + long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer + + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000) + + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime", + 30000) * bulkPlan.size(); + LOG.debug("Timeout-on-RIT=" + timeout); + return timeout; + } + + @Override + protected UncaughtExceptionHandler getUncaughtExceptionHandler() { + return new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Assigning regions in " + t.getName(), e); + } + }; + } + + private int reassignFailedPlans() { + List<HRegionInfo> reassigningRegions = new ArrayList<>(); + for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) { + LOG.info("Failed assigning " + e.getValue().size() + + " regions to server " + e.getKey() + ", reassigning them"); + reassigningRegions.addAll(failedPlans.remove(e.getKey())); + } + RegionStates regionStates = assignmentManager.getRegionStates(); + for (HRegionInfo region : reassigningRegions) { + if (!regionStates.isRegionOnline(region)) { + assignmentManager.invokeAssign(region); + } + } + return reassigningRegions.size(); + } + + /** + * Manage bulk assigning to a server. + */ + static class SingleServerBulkAssigner implements Runnable { + private final ServerName regionserver; + private final List<HRegionInfo> regions; + private final AssignmentManager assignmentManager; + private final Map<ServerName, List<HRegionInfo>> failedPlans; + + SingleServerBulkAssigner(final ServerName regionserver, + final List<HRegionInfo> regions, final AssignmentManager am, + final Map<ServerName, List<HRegionInfo>> failedPlans) { + this.regionserver = regionserver; + this.regions = regions; + this.assignmentManager = am; + this.failedPlans = failedPlans; + } + + @Override + public void run() { + try { + if (!assignmentManager.assign(regionserver, regions)) { + failedPlans.put(regionserver, regions); + } + } catch (Throwable t) { + LOG.warn("Failed bulking assigning " + regions.size() + + " region(s) to " + regionserver.getServerName() + + ", and continue to bulk assign others", t); + failedPlans.put(regionserver, regions); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 83f5a1c..4dd6353 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -36,8 +36,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; @@ -91,10 +90,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; -import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; @@ -115,15 +110,16 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; -import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; +import org.apache.hadoop.hbase.master.procedure.MergeTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.master.procedure.SplitTableRegionProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -346,6 +342,7 @@ public class HMaster extends HRegionServer implements MasterServices { private RegionNormalizerChore normalizerChore; private ClusterStatusChore clusterStatusChore; private ClusterStatusPublisher clusterStatusPublisherChore = null; + private PeriodicDoMetrics periodicDoMetricsChore = null; CatalogJanitor catalogJanitorChore; private ReplicationMetaCleaner replicationMetaCleaner; @@ -446,6 +443,19 @@ public class HMaster extends HRegionServer implements MasterServices { } } + private static class PeriodicDoMetrics extends ScheduledChore { + private final HMaster server; + public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) { + super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval); + this.server = server; + } + + @Override + protected void chore() { + server.doMetrics(); + } + } + /** * Initializes the HMaster. The steps are as follows: * <p> @@ -648,6 +658,20 @@ public class HMaster extends HRegionServer implements MasterServices { return MasterDumpServlet.class; } + /** + * Emit the HMaster metrics, such as region in transition metrics. + * Surrounding in a try block just to be sure metrics doesn't abort HMaster. + */ + private void doMetrics() { + try { + if (assignmentManager != null) { + assignmentManager.updateRegionsInTransitionMetrics(); + } + } catch (Throwable e) { + LOG.error("Couldn't update metrics: " + e.getMessage()); + } + } + MetricsMaster getMasterMetrics() { return metricsMaster; } @@ -670,9 +694,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker.start(); - // Create Assignment Manager - this.assignmentManager = new AssignmentManager(this); - this.assignmentManager.start(); + this.assignmentManager = new AssignmentManager(this, serverManager, + this.balancer, this.service, this.metricsMaster, tableStateManager); this.replicationManager = new ReplicationManager(conf, zooKeeper, this); @@ -863,6 +886,10 @@ public class HMaster extends HRegionServer implements MasterServices { this.catalogJanitorChore = new CatalogJanitor(this); getChoreService().scheduleChore(catalogJanitorChore); + // Do Metrics periodically + periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this); + getChoreService().scheduleChore(periodicDoMetricsChore); + status.setStatus("Starting cluster schema service"); initClusterSchemaService(); @@ -875,8 +902,7 @@ public class HMaster extends HRegionServer implements MasterServices { } status.markComplete("Initialization successful"); - LOG.info(String.format("Master has completed initialization %.3fsec", - (System.currentTimeMillis() - masterActiveTime) / 1000.0f)); + LOG.info("Master has completed initialization"); configurationManager.registerObserver(this.balancer); configurationManager.registerObserver(this.hfileCleaner); @@ -985,8 +1011,8 @@ public class HMaster extends HRegionServer implements MasterServices { // Check zk for region servers that are up but didn't register for (ServerName sn: this.regionServerTracker.getOnlineServers()) { // The isServerOnline check is opportunistic, correctness is handled inside - if (!this.serverManager.isServerOnline(sn) && - serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { + if (!this.serverManager.isServerOnline(sn) + && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { LOG.info("Registered server found up in zk but who has not yet reported in: " + sn); } } @@ -1119,6 +1145,12 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override + protected void sendShutdownInterrupt() { + super.sendShutdownInterrupt(); + stopProcedureExecutor(); + } + + @Override protected void stopServiceThreads() { if (masterJettyServer != null) { LOG.info("Stopping master jetty server"); @@ -1140,20 +1172,15 @@ public class HMaster extends HRegionServer implements MasterServices { if (LOG.isDebugEnabled()) { LOG.debug("Stopping service threads"); } - // Clean up and close up shop if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); - if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); - - stopProcedureExecutor(); - if (this.walManager != null) this.walManager.stop(); if (this.fileSystemManager != null) this.fileSystemManager.stop(); if (this.mpmHost != null) this.mpmHost.stop("server shutting down."); @@ -1163,9 +1190,6 @@ public class HMaster extends HRegionServer implements MasterServices { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); final Path walDir = new Path(FSUtils.getWALRootDir(this.conf), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - // TODO: No cleaner currently! - final Path walArchiveDir = new Path(HFileArchiveUtil.getArchivePath(this.conf), - MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); final FileSystem walFs = walDir.getFileSystem(conf); @@ -1179,7 +1203,7 @@ public class HMaster extends HRegionServer implements MasterServices { FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); - procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir, + procedureStore = new WALProcedureStore(conf, walFs, walDir, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); @@ -1194,20 +1218,16 @@ public class HMaster extends HRegionServer implements MasterServices { MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); procedureExecutor.start(numThreads, abortOnCorruption); - procEnv.getRemoteDispatcher().start(); } private void stopProcedureExecutor() { if (procedureExecutor != null) { configurationManager.deregisterObserver(procedureExecutor.getEnvironment()); - procedureExecutor.getEnvironment().getRemoteDispatcher().stop(); procedureExecutor.stop(); - procedureExecutor = null; } if (procedureStore != null) { procedureStore.stop(isAborted()); - procedureStore = null; } } @@ -1237,6 +1257,9 @@ public class HMaster extends HRegionServer implements MasterServices { this.mobCompactThread.close(); } + if (this.periodicDoMetricsChore != null) { + periodicDoMetricsChore.cancel(); + } if (this.quotaObserverChore != null) { quotaObserverChore.cancel(); } @@ -1297,7 +1320,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Sleep to next balance plan start time // But if there are zero regions in transition, it can skip sleep to speed up. while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime - && this.assignmentManager.getRegionStates().hasRegionsInTransition()) { + && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() != 0) { try { Thread.sleep(100); } catch (InterruptedException ie) { @@ -1308,7 +1331,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Throttling by max number regions in transition while (!interrupted && maxRegionsInTransition > 0 - && this.assignmentManager.getRegionStates().getRegionsInTransition().size() + && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) { try { // sleep if the number of regions in transition exceeds the limit @@ -1341,26 +1364,21 @@ public class HMaster extends HRegionServer implements MasterServices { synchronized (this.balancer) { // If balance not true, don't run balancer. if (!this.loadBalancerTracker.isBalancerOn()) return false; - // Only allow one balance run at at time. - if (this.assignmentManager.hasRegionsInTransition()) { - List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition(); + // Only allow one balance run at at time. + if (this.assignmentManager.getRegionStates().isRegionsInTransition()) { + Set<RegionState> regionsInTransition = + this.assignmentManager.getRegionStates().getRegionsInTransition(); // if hbase:meta region is in transition, result of assignment cannot be recorded // ignore the force flag in that case - boolean metaInTransition = assignmentManager.isMetaRegionInTransition(); + boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition(); String prefix = force && !metaInTransition ? "R" : "Not r"; - List<RegionStateNode> toPrint = regionsInTransition; - int max = 5; - boolean truncated = false; - if (regionsInTransition.size() > max) { - toPrint = regionsInTransition.subList(0, max); - truncated = true; - } - LOG.info(prefix + "unning balancer because " + regionsInTransition.size() + - " region(s) in transition: " + toPrint + (truncated? "(truncated list)": "")); + LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() + + " region(s) in transition: " + org.apache.commons.lang.StringUtils. + abbreviate(regionsInTransition.toString(), 256)); if (!force || metaInTransition) return false; } if (this.serverManager.areDeadServersInProgress()) { - LOG.info("Not running balancer because processing dead regionserver(s): " + + LOG.debug("Not running balancer because processing dead regionserver(s): " + this.serverManager.getDeadServers()); return false; } @@ -1385,7 +1403,7 @@ public class HMaster extends HRegionServer implements MasterServices { //Give the balancer the current cluster state. this.balancer.setClusterStatus(getClusterStatus()); this.balancer.setClusterLoad( - this.assignmentManager.getRegionStates().getAssignmentsByTable()); + this.assignmentManager.getRegionStates().getAssignmentsByTable(true)); for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) { List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue()); @@ -1404,7 +1422,7 @@ public class HMaster extends HRegionServer implements MasterServices { for (RegionPlan plan: plans) { LOG.info("balance " + plan); //TODO: bulk assign - this.assignmentManager.moveAsync(plan); + this.assignmentManager.balance(plan); rpCount++; balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition, @@ -1520,59 +1538,6 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public long dispatchMergingRegions( - final HRegionInfo regionInfoA, - final HRegionInfo regionInfoB, - final boolean forcible, - final long nonceGroup, - final long nonce) throws IOException { - checkInitialized(); - - TableName tableName = regionInfoA.getTable(); - if (tableName == null || regionInfoB.getTable() == null) { - throw new UnknownRegionException ("Can't merge regions without table associated"); - } - - if (!tableName.equals(regionInfoB.getTable())) { - throw new IOException ("Cannot merge regions from two different tables"); - } - - if (regionInfoA.compareTo(regionInfoB) == 0) { - throw new MergeRegionException( - "Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB); - } - - final HRegionInfo [] regionsToMerge = new HRegionInfo[2]; - regionsToMerge [0] = regionInfoA; - regionsToMerge [1] = regionInfoB; - - return MasterProcedureUtil.submitProcedure( - new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { - @Override - protected void run() throws IOException { - MasterCoprocessorHost mcph = getMaster().getMasterCoprocessorHost(); - if (mcph != null) { - mcph.preDispatchMerge(regionInfoA, regionInfoB); - } - - LOG.info(getClientIdAuditPrefix() + " Dispatch merge regions " + - regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName()); - - submitProcedure(new DispatchMergingRegionsProcedure( - procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible)); - if (mcph != null) { - mcph.postDispatchMerge(regionInfoA, regionInfoB); - } - } - - @Override - protected String getDescription() { - return "DispatchMergingRegionsProcedure"; - } - }); - } - - @Override public long mergeRegions( final HRegionInfo[] regionsToMerge, final boolean forcible, @@ -1615,38 +1580,40 @@ public class HMaster extends HRegionServer implements MasterServices { @Override protected String getDescription() { - return "MergeTableProcedure"; + return "DisableTableProcedure"; } }); } @Override - public long splitRegion(final HRegionInfo regionInfo, final byte[] splitRow, - final long nonceGroup, final long nonce) - throws IOException { + public long splitRegion( + final HRegionInfo regionInfo, + final byte[] splitRow, + final long nonceGroup, + final long nonce) throws IOException { checkInitialized(); + return MasterProcedureUtil.submitProcedure( new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { @Override protected void run() throws IOException { getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow); - LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString()); + + LOG.info(getClientIdAuditPrefix() + " Split region " + regionInfo); // Execute the operation asynchronously - submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow)); + submitProcedure(new SplitTableRegionProcedure(procedureExecutor.getEnvironment(), + regionInfo, splitRow)); } @Override protected String getDescription() { - return "SplitTableProcedure"; + return "DisableTableProcedure"; } }); } - // Public so can be accessed by tests. Blocks until move is done. - // Replace with an async implementation from which you can get - // a success/failure result. - @VisibleForTesting + @VisibleForTesting // Public so can be accessed by tests. public void move(final byte[] encodedRegionName, final byte[] destServerName) throws HBaseIOException { RegionState regionState = assignmentManager.getRegionStates(). @@ -1697,8 +1664,6 @@ public class HMaster extends HRegionServer implements MasterServices { // Now we can do the move RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest); - assert rp.getDestination() != null: rp.toString() + " " + dest; - assert rp.getSource() != null: rp.toString(); try { checkInitialized(); @@ -1707,20 +1672,13 @@ public class HMaster extends HRegionServer implements MasterServices { return; } } - // Warmup the region on the destination before initiating the move. this call + // warmup the region on the destination before initiating the move. this call // is synchronous and takes some time. doing it before the source region gets // closed serverManager.sendRegionWarmup(rp.getDestination(), hri); LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer"); - Future<byte []> future = this.assignmentManager.moveAsync(rp); - try { - // Is this going to work? Will we throw exception on error? - // TODO: CompletableFuture rather than this stunted Future. - future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new HBaseIOException(e); - } + this.assignmentManager.balance(rp); if (this.cpHost != null) { this.cpHost.postMove(hri, rp.getSource(), rp.getDestination()); } @@ -2059,7 +2017,7 @@ public class HMaster extends HRegionServer implements MasterServices { status.cleanup(); } } - }, getServerName().toShortString() + ".masterManager")); + }, getServerName().toShortString() + ".activeMasterManager")); } private void checkCompression(final HTableDescriptor htd) @@ -2512,9 +2470,8 @@ public class HMaster extends HRegionServer implements MasterServices { String clusterId = fileSystemManager != null ? fileSystemManager.getClusterId().toString() : null; - List<RegionState> regionsInTransition = assignmentManager != null ? - assignmentManager.getRegionStates().getRegionsStateInTransition() : null; - + Set<RegionState> regionsInTransition = assignmentManager != null ? + assignmentManager.getRegionStates().getRegionsInTransition() : null; String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null; boolean balancerOn = loadBalancerTracker != null ? loadBalancerTracker.isBalancerOn() : false; @@ -2722,7 +2679,6 @@ public class HMaster extends HRegionServer implements MasterServices { procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized); } - @Override public ProcedureEvent getInitializedEvent() { return initialized; } @@ -2833,7 +2789,7 @@ public class HMaster extends HRegionServer implements MasterServices { * @see org.apache.hadoop.hbase.master.HMasterCommandLine */ public static void main(String [] args) { - LOG.info("STARTING service '" + HMaster.class.getSimpleName()); + LOG.info("***** STARTING service '" + HMaster.class.getSimpleName() + "' *****"); VersionInfo.logVersion(); new HMasterCommandLine(HMaster.class).doMain(args); } @@ -3278,7 +3234,6 @@ public class HMaster extends HRegionServer implements MasterServices { * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType} * @return The state of the switch */ - @Override public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) { if (null == splitOrMergeTracker || isInMaintenanceMode()) { return false; http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 4611982..129fa7a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -45,7 +45,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; * locations for all Regions in a cluster. * * <p>This class produces plans for the - * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} + * {@link org.apache.hadoop.hbase.master.AssignmentManager} * to execute. */ @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 4d18ac9..6064f9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -810,28 +810,6 @@ public class MasterCoprocessorHost }); } - public void preDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB) - throws IOException { - execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { - @Override - public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) - throws IOException { - oserver.preDispatchMerge(ctx, regionInfoA, regionInfoB); - } - }); - } - - public void postDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB) - throws IOException { - execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { - @Override - public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) - throws IOException { - oserver.postDispatchMerge(ctx, regionInfoA, regionInfoB); - } - }); - } - public void preMergeRegions(final HRegionInfo[] regionsToMerge) throws IOException { execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java index a48444c..a921ab5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java @@ -24,6 +24,7 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.util.Date; import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -32,8 +33,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.monitoring.LogMonitoring; import org.apache.hadoop.hbase.monitoring.StateDumpServlet; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -118,8 +117,9 @@ public class MasterDumpServlet extends StateDumpServlet { return; } - for (RegionStateNode rs : am.getRegionsInTransition()) { - String rid = rs.getRegionInfo().getEncodedName(); + Set<RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition(); + for (RegionState rs : regionsInTransition) { + String rid = rs.getRegion().getRegionNameAsString(); out.println("Region " + rid + ": " + rs.toDescriptiveString()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java index 049e659..1988e2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -32,8 +33,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -107,7 +108,14 @@ public class MasterMetaBootstrap { } private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException { - master.getMasterWalManager().splitMetaLog(currentMetaServer); + if (RecoveryMode.LOG_REPLAY == master.getMasterWalManager().getLogRecoveryMode()) { + // In log replay mode, we mark hbase:meta region as recovering in ZK + master.getMasterWalManager().prepareLogReplay(currentMetaServer, + Collections.<HRegionInfo>singleton(HRegionInfo.FIRST_META_REGIONINFO)); + } else { + // In recovered.edits mode: create recovered edits file for hbase:meta server + master.getMasterWalManager().splitMetaLog(currentMetaServer); + } } private void unassignExcessMetaReplica(int numMetaReplicasConfigured) { @@ -143,9 +151,7 @@ public class MasterMetaBootstrap { // Work on meta region int assigned = 0; - // TODO: Unimplemented - // long timeout = - // master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000); + long timeout = master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000); if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { status.setStatus("Assigning hbase:meta region"); } else { @@ -154,10 +160,37 @@ public class MasterMetaBootstrap { // Get current meta state from zk. RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper(), replicaId); - LOG.debug("meta state from zookeeper: " + metaState); - HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica( - HRegionInfo.FIRST_META_REGIONINFO, replicaId); - assignmentManager.assignMeta(hri, metaState.getServerName()); + HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, + replicaId); + RegionStates regionStates = assignmentManager.getRegionStates(); + regionStates.createRegionState(hri, metaState.getState(), + metaState.getServerName(), null); + + if (!metaState.isOpened() || !master.getMetaTableLocator().verifyMetaRegionLocation( + master.getClusterConnection(), master.getZooKeeper(), timeout, replicaId)) { + ServerName currentMetaServer = metaState.getServerName(); + if (master.getServerManager().isServerOnline(currentMetaServer)) { + if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { + LOG.info("Meta was in transition on " + currentMetaServer); + } else { + LOG.info("Meta with replicaId " + replicaId + " was in transition on " + + currentMetaServer); + } + assignmentManager.processRegionsInTransition(Collections.singletonList(metaState)); + } else { + if (currentMetaServer != null) { + if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { + splitMetaLogBeforeAssignment(currentMetaServer); + regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO); + previouslyFailedMetaRSs.add(currentMetaServer); + } + } + LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId + + " it was on " + currentMetaServer); + assignmentManager.assignMeta(hri); + } + assigned++; + } if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { // TODO: should we prevent from using state manager before meta was initialized? @@ -166,6 +199,14 @@ public class MasterMetaBootstrap { .setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED); } + if ((RecoveryMode.LOG_REPLAY == master.getMasterWalManager().getLogRecoveryMode()) + && (!previouslyFailedMetaRSs.isEmpty())) { + // replay WAL edits mode need new hbase:meta RS is assigned firstly + status.setStatus("replaying log for Meta Region"); + master.getMasterWalManager().splitMetaLog(previouslyFailedMetaRSs); + } + + assignmentManager.setEnabledTable(TableName.META_TABLE_NAME); master.getTableStateManager().start(); // Make sure a hbase:meta location is set. We need to enable SSH here since @@ -173,7 +214,7 @@ public class MasterMetaBootstrap { // by SSH so that system tables can be assigned. // No need to wait for meta is assigned = 0 when meta is just verified. if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0); - LOG.info("hbase:meta with replicaId " + replicaId + ", location=" + LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location=" + master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper(), replicaId)); status.setStatus("META assigned."); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index c43a4d1..296d4d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; @@ -45,7 +46,6 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; @@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; @@ -86,6 +85,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.*; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse; @@ -136,6 +136,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; @@ -304,11 +306,7 @@ public class MasterRpcServices extends RSRpcServices ClusterStatusProtos.ServerLoad sl = request.getLoad(); ServerName serverName = ProtobufUtil.toServerName(request.getServer()); ServerLoad oldLoad = master.getServerManager().getLoad(serverName); - ServerLoad newLoad = new ServerLoad(sl); - master.getServerManager().regionServerReport(serverName, newLoad); - int version = VersionInfoUtil.getCurrentClientVersionNumber(); - master.getAssignmentManager().reportOnlineRegions(serverName, - version, newLoad.getRegionsLoad().keySet()); + master.getServerManager().regionServerReport(serverName, new ServerLoad(sl)); if (sl != null && master.metricsMaster != null) { // Up our metrics. master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() @@ -381,25 +379,25 @@ public class MasterRpcServices extends RSRpcServices public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req) throws ServiceException { try { - master.checkInitialized(); + final byte [] regionName = req.getRegion().getValue().toByteArray(); + RegionSpecifierType type = req.getRegion().getType(); + AssignRegionResponse arr = AssignRegionResponse.newBuilder().build(); - final RegionSpecifierType type = req.getRegion().getType(); + master.checkInitialized(); if (type != RegionSpecifierType.REGION_NAME) { LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME + " actual: " + type); } - - final byte[] regionName = req.getRegion().getValue().toByteArray(); - final HRegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName); - if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); - - final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build(); + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + HRegionInfo regionInfo = regionStates.getRegionInfo(regionName); + if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName)); if (master.cpHost != null) { if (master.cpHost.preAssign(regionInfo)) { return arr; } } - LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString()); + LOG.info(master.getClientIdAuditPrefix() + + " assign " + regionInfo.getRegionNameAsString()); master.getAssignmentManager().assign(regionInfo, true); if (master.cpHost != null) { master.cpHost.postAssign(regionInfo); @@ -410,7 +408,6 @@ public class MasterRpcServices extends RSRpcServices } } - @Override public BalanceResponse balance(RpcController controller, BalanceRequest request) throws ServiceException { @@ -630,7 +627,8 @@ public class MasterRpcServices extends RSRpcServices } @Override - public SplitTableRegionResponse splitRegion(final RpcController controller, + public SplitTableRegionResponse splitRegion( + final RpcController controller, final SplitTableRegionRequest request) throws ServiceException { try { long procId = master.splitRegion( @@ -1217,24 +1215,24 @@ public class MasterRpcServices extends RSRpcServices @Override public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request) throws ServiceException { + final byte [] regionName = request.getRegion().getValue().toByteArray(); + RegionSpecifierType type = request.getRegion().getType(); + if (type != RegionSpecifierType.REGION_NAME) { + LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME + + " actual: " + type); + } + try { master.checkInitialized(); - - final RegionSpecifierType type = request.getRegion().getType(); - if (type != RegionSpecifierType.REGION_NAME) { - LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME - + " actual: " + type); - } - - final byte[] regionName = request.getRegion().getValue().toByteArray(); - final HRegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName); - if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); - + Pair<HRegionInfo, ServerName> pair = + MetaTableAccessor.getRegion(master.getConnection(), regionName); + if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + HRegionInfo hri = pair.getFirst(); if (master.cpHost != null) { master.cpHost.preRegionOffline(hri); } LOG.info(master.getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString()); - master.getAssignmentManager().offlineRegion(hri); + master.getAssignmentManager().regionOffline(hri); if (master.cpHost != null) { master.cpHost.postRegionOffline(hri); } @@ -1419,7 +1417,26 @@ public class MasterRpcServices extends RSRpcServices ReportRegionStateTransitionRequest req) throws ServiceException { try { master.checkServiceStarted(); - return master.getAssignmentManager().reportRegionStateTransition(req); + RegionStateTransition rt = req.getTransition(0); + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + for (RegionInfo ri : rt.getRegionInfoList()) { + TableName tableName = ProtobufUtil.toTableName(ri.getTableName()); + if (!(TableName.META_TABLE_NAME.equals(tableName) + && regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null) + && !master.getAssignmentManager().isFailoverCleanupDone()) { + // Meta region is assigned before master finishes the + // failover cleanup. So no need this check for it + throw new PleaseHoldException("Master is rebuilding user regions"); + } + } + ServerName sn = ProtobufUtil.toServerName(req.getServer()); + String error = master.getAssignmentManager().onRegionTransition(sn, rt); + ReportRegionStateTransitionResponse.Builder rrtr = + ReportRegionStateTransitionResponse.newBuilder(); + if (error != null) { + rrtr.setErrorMessage(error); + } + return rrtr.build(); } catch (IOException ioe) { throw new ServiceException(ioe); } @@ -2008,34 +2025,4 @@ public class MasterRpcServices extends RSRpcServices throw new ServiceException(e); } } - - @Override - public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller, - DispatchMergingRegionsRequest request) throws ServiceException { - final byte[] encodedNameOfRegionA = request.getRegionA().getValue().toByteArray(); - final byte[] encodedNameOfRegionB = request.getRegionB().getValue().toByteArray(); - if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME || - request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) { - LOG.warn("mergeRegions specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME + - " actual: region_a=" + - request.getRegionA().getType() + ", region_b=" + - request.getRegionB().getType()); - } - RegionStates regionStates = master.getAssignmentManager().getRegionStates(); - RegionState regionStateA = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionA)); - RegionState regionStateB = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionB)); - if (regionStateA == null || regionStateB == null) { - throw new ServiceException(new UnknownRegionException( - Bytes.toStringBinary(regionStateA == null? encodedNameOfRegionA: encodedNameOfRegionB))); - } - final HRegionInfo regionInfoA = regionStateA.getRegion(); - final HRegionInfo regionInfoB = regionStateB.getRegion(); - try { - long procId = master.dispatchMergingRegions(regionInfoA, regionInfoB, request.getForcible(), - request.getNonceGroup(), request.getNonce()); - return DispatchMergingRegionsResponse.newBuilder().setProcId(procId).build(); - } catch (IOException ioe) { - throw new ServiceException(ioe); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 781e907..4924d72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -32,9 +32,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; @@ -42,14 +40,11 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockInfo; -import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; - -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Service; /** @@ -128,12 +123,6 @@ public interface MasterServices extends Server { ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor(); /** - * @return Tripped when Master has finished initialization. - */ - @VisibleForTesting - public ProcedureEvent getInitializedEvent(); - - /** * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. * @throws TableNotDisabledException @@ -277,23 +266,6 @@ public interface MasterServices extends Server { throws IOException; /** - * Merge two regions. The real implementation is on the regionserver, master - * just move the regions together and send MERGE RPC to regionserver - * @param region_a region to merge - * @param region_b region to merge - * @param forcible true if do a compulsory merge, otherwise we will only merge - * two adjacent regions - * @return procedure Id - * @throws IOException - */ - long dispatchMergingRegions( - final HRegionInfo region_a, - final HRegionInfo region_b, - final boolean forcible, - final long nonceGroup, - final long nonce) throws IOException; - - /** * Merge regions in a table. * @param regionsToMerge daughter regions to merge * @param forcible whether to force to merge even two regions are not adjacent @@ -429,8 +401,6 @@ public interface MasterServices extends Server { */ boolean isStopping(); - boolean isSplitOrMergeEnabled(MasterSwitchType switchType); - /** * @return Favored Nodes Manager */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 928702e..105fa29 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.master; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -39,13 +41,12 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; -import com.google.common.annotations.VisibleForTesting; - /** * This class abstracts a bunch of operations the HMaster needs * when splitting log files e.g. finding log files, dirs etc. @@ -331,4 +332,16 @@ public class MasterWalManager { } } } + + /** + * The function is used in SSH to set recovery mode based on configuration after all outstanding + * log split tasks drained. + */ + public void setLogRecoveryMode() throws IOException { + this.splitLogManager.setRecoveryMode(false); + } + + public RecoveryMode getLogRecoveryMode() { + return this.splitLogManager.getRecoveryMode(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java index c7ce9a9..40e79ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; public class MetricsAssignmentManager { + private final MetricsAssignmentManagerSource assignmentManagerSource; public MetricsAssignmentManager() { @@ -32,11 +33,19 @@ public class MetricsAssignmentManager { return assignmentManagerSource; } + public void updateAssignmentTime(long time) { + assignmentManagerSource.updateAssignmentTime(time); + } + + public void updateBulkAssignTime(long time) { + assignmentManagerSource.updateBulkAssignTime(time); + } + /** * set new value for number of regions in transition. * @param ritCount */ - public void updateRITCount(final int ritCount) { + public void updateRITCount(int ritCount) { assignmentManagerSource.setRIT(ritCount); } @@ -45,15 +54,14 @@ public class MetricsAssignmentManager { * as defined by the property rit.metrics.threshold.time. * @param ritCountOverThreshold */ - public void updateRITCountOverThreshold(final int ritCountOverThreshold) { + public void updateRITCountOverThreshold(int ritCountOverThreshold) { assignmentManagerSource.setRITCountOverThreshold(ritCountOverThreshold); } - /** * update the timestamp for oldest region in transition metrics. * @param timestamp */ - public void updateRITOldestAge(final long timestamp) { + public void updateRITOldestAge(long timestamp) { assignmentManagerSource.setRITOldestAge(timestamp); } @@ -64,27 +72,4 @@ public class MetricsAssignmentManager { public void updateRitDuration(long duration) { assignmentManagerSource.updateRitDuration(duration); } - - /* - * Increment the count of assignment operation (assign/unassign). - */ - public void incrementOperationCounter() { - assignmentManagerSource.incrementOperationCounter(); - } - - /** - * Add the time took to perform the last assign operation - * @param time - */ - public void updateAssignTime(final long time) { - assignmentManagerSource.updateAssignTime(time); - } - - /** - * Add the time took to perform the last unassign operation - * @param time - */ - public void updateUnassignTime(final long time) { - assignmentManagerSource.updateUnassignTime(time); - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java deleted file mode 100644 index e119e88..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NoSuchProcedureException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import org.apache.hadoop.hbase.HBaseIOException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - [email protected] -// Based on HBaseIOE rather than PE because easier to integrate when an IOE. -public class NoSuchProcedureException extends HBaseIOException { - public NoSuchProcedureException() { - super(); - } - - public NoSuchProcedureException(String s) { - super(s); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java index 17eb346..cd6b313 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlan.java @@ -135,8 +135,8 @@ public class RegionPlan implements Comparable<RegionPlan> { @Override public String toString() { - return "hri=" + this.hri.getRegionNameAsString() + ", source=" + + return "hri=" + this.hri.getRegionNameAsString() + ", src=" + (this.source == null? "": this.source.toString()) + - ", destination=" + (this.dest == null? "": this.dest.toString()); + ", dest=" + (this.dest == null? "": this.dest.toString()); } }
