http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java deleted file mode 100644 index 929cd4e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkAssigner.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.concurrent.Executors; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Server; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * Base class used bulk assigning and unassigning regions. - * Encapsulates a fixed size thread pool of executors to run assignment/unassignment. - * Implement {@link #populatePool(java.util.concurrent.ExecutorService)} and - * {@link #waitUntilDone(long)}. The default implementation of - * the {@link #getUncaughtExceptionHandler()} is to abort the hosting - * Server. - */ [email protected] -public abstract class BulkAssigner { - protected final Server server; - - /** - * @param server An instance of Server - */ - public BulkAssigner(final Server server) { - this.server = server; - } - - /** - * @return What to use for a thread prefix when executor runs. - */ - protected String getThreadNamePrefix() { - return this.server.getServerName() + "-" + this.getClass().getName(); - } - - protected UncaughtExceptionHandler getUncaughtExceptionHandler() { - return new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - // Abort if exception of any kind. - server.abort("Uncaught exception in " + t.getName(), e); - } - }; - } - - protected int getThreadCount() { - return this.server.getConfiguration(). - getInt("hbase.bulk.assignment.threadpool.size", 20); - } - - protected long getTimeoutOnRIT() { - return this.server.getConfiguration(). - getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000); - } - - protected abstract void populatePool( - final java.util.concurrent.ExecutorService pool) throws IOException; - - public boolean bulkAssign() throws InterruptedException, IOException { - return bulkAssign(true); - } - - /** - * Run the bulk assign. - * - * @param sync - * Whether to assign synchronously. - * @throws InterruptedException - * @return True if done. - * @throws IOException - */ - public boolean bulkAssign(boolean sync) throws InterruptedException, - IOException { - boolean result = false; - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setDaemon(true); - builder.setNameFormat(getThreadNamePrefix() + "-%1$d"); - builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler()); - int threadCount = getThreadCount(); - java.util.concurrent.ExecutorService pool = - Executors.newFixedThreadPool(threadCount, builder.build()); - try { - populatePool(pool); - // How long to wait on empty regions-in-transition. If we timeout, the - // RIT monitor should do fixup. - if (sync) result = waitUntilDone(getTimeoutOnRIT()); - } finally { - // We're done with the pool. It'll exit when its done all in queue. - pool.shutdown(); - } - return result; - } - - /** - * Wait until bulk assign is done. - * @param timeout How long to wait. - * @throws InterruptedException - * @return True if the condition we were waiting on happened. - */ - protected abstract boolean waitUntilDone(final long timeout) - throws InterruptedException; -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java deleted file mode 100644 index d8c511e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; - -/** - * Performs bulk reopen of the list of regions provided to it. - */ [email protected] -public class BulkReOpen extends BulkAssigner { - private final Map<ServerName, List<HRegionInfo>> rsToRegions; - private final AssignmentManager assignmentManager; - private static final Log LOG = LogFactory.getLog(BulkReOpen.class); - - public BulkReOpen(final Server server, - final Map<ServerName, List<HRegionInfo>> serverToRegions, - final AssignmentManager am) { - super(server); - this.assignmentManager = am; - this.rsToRegions = serverToRegions; - } - - /** - * Unassign all regions, so that they go through the regular region - * assignment flow (in assignment manager) and are re-opened. - */ - @Override - protected void populatePool(ExecutorService pool) { - LOG.debug("Creating threads for each region server "); - for (Map.Entry<ServerName, List<HRegionInfo>> e : rsToRegions - .entrySet()) { - final List<HRegionInfo> hris = e.getValue(); - // add plans for the regions that need to be reopened - Map<String, RegionPlan> plans = new HashMap<>(); - for (HRegionInfo hri : hris) { - RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri); - plans.put(hri.getEncodedName(), reOpenPlan); - } - assignmentManager.addPlans(plans); - pool.execute(new Runnable() { - public void run() { - try { - unassign(hris); - } catch (Throwable t) { - LOG.warn("Failed bulking re-open " + hris.size() - + " region(s)", t); - } - } - }); - } - } - - /** - * Reopen the regions asynchronously, so always returns true immediately. - * @return true - */ - @Override - protected boolean waitUntilDone(long timeout) { - return true; - } - - /** - * Configuration knobs "hbase.bulk.reopen.threadpool.size" number of regions - * that can be reopened concurrently. The maximum number of threads the master - * creates is never more than the number of region servers. - * If configuration is not defined it defaults to 20 - */ - protected int getThreadCount() { - int defaultThreadCount = super.getThreadCount(); - return this.server.getConfiguration().getInt( - "hbase.bulk.reopen.threadpool.size", defaultThreadCount); - } - - public boolean bulkReOpen() throws InterruptedException, IOException { - return bulkAssign(); - } - - /** - * Unassign the list of regions. Configuration knobs: - * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to - * wait before unassigning another region from this region server - * - * @param regions - * @throws InterruptedException - */ - private void unassign( - List<HRegionInfo> regions) throws InterruptedException { - int waitTime = this.server.getConfiguration().getInt( - "hbase.bulk.waitbetween.reopen", 0); - RegionStates regionStates = assignmentManager.getRegionStates(); - for (HRegionInfo region : regions) { - if (server.isStopped()) { - return; - } - if (regionStates.isRegionInTransition(region)) { - continue; - } - assignmentManager.unassign(region); - while (regionStates.isRegionInTransition(region) - && !server.isStopped()) { - regionStates.waitForUpdate(100); - } - if (waitTime > 0 && !server.isStopped()) { - Thread.sleep(waitTime); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index affd44c..d682b82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.util.Bytes; @@ -112,7 +113,7 @@ public class CatalogJanitor extends ScheduledChore { && !this.services.isInMaintenanceMode() && am != null && am.isFailoverCleanupDone() - && am.getRegionStates().getRegionsInTransition().isEmpty()) { + && am.hasRegionsInTransition()) { scan(); } else { LOG.warn("CatalogJanitor disabled! Not running scan."); @@ -167,6 +168,7 @@ public class CatalogJanitor extends ScheduledChore { // Another table, stop scanning return false; } + LOG.info("CATALOG INFO " + info + " IS-SPLIT_PARENT=" + info.isSplitParent()); if (info.isSplitParent()) splitParents.put(info, r); if (r.getValue(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER) != null) { mergedRegions.put(info, r); @@ -469,4 +471,4 @@ public class CatalogJanitor extends ScheduledChore { return cleanMergeRegion(region, mergeRegions.getFirst(), mergeRegions.getSecond()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java deleted file mode 100644 index fc3607f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; - -/** - * Run bulk assign. Does one RCP per regionserver passing a - * batch of regions using {@link GeneralBulkAssigner.SingleServerBulkAssigner}. - */ [email protected] -public class GeneralBulkAssigner extends BulkAssigner { - private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class); - - private Map<ServerName, List<HRegionInfo>> failedPlans = new ConcurrentHashMap<>(); - private ExecutorService pool; - - final Map<ServerName, List<HRegionInfo>> bulkPlan; - final AssignmentManager assignmentManager; - final boolean waitTillAllAssigned; - - public GeneralBulkAssigner(final Server server, - final Map<ServerName, List<HRegionInfo>> bulkPlan, - final AssignmentManager am, final boolean waitTillAllAssigned) { - super(server); - this.bulkPlan = bulkPlan; - this.assignmentManager = am; - this.waitTillAllAssigned = waitTillAllAssigned; - } - - @Override - protected String getThreadNamePrefix() { - return this.server.getServerName() + "-GeneralBulkAssigner"; - } - - @Override - protected void populatePool(ExecutorService pool) { - this.pool = pool; // shut it down later in case some assigner hangs - for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) { - pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(), - this.assignmentManager, this.failedPlans)); - } - } - - /** - * - * @param timeout How long to wait. - * @return true if done. - */ - @Override - protected boolean waitUntilDone(final long timeout) - throws InterruptedException { - Set<HRegionInfo> regionSet = new HashSet<>(); - for (List<HRegionInfo> regionList : bulkPlan.values()) { - regionSet.addAll(regionList); - } - - pool.shutdown(); // no more task allowed - int serverCount = bulkPlan.size(); - int regionCount = regionSet.size(); - long startTime = System.currentTimeMillis(); - long rpcWaitTime = startTime + timeout; - while (!server.isStopped() && !pool.isTerminated() - && rpcWaitTime > System.currentTimeMillis()) { - if (failedPlans.isEmpty()) { - pool.awaitTermination(100, TimeUnit.MILLISECONDS); - } else { - reassignFailedPlans(); - } - } - if (!pool.isTerminated()) { - LOG.warn("bulk assigner is still running after " - + (System.currentTimeMillis() - startTime) + "ms, shut it down now"); - // some assigner hangs, can't wait any more, shutdown the pool now - List<Runnable> notStarted = pool.shutdownNow(); - if (notStarted != null && !notStarted.isEmpty()) { - server.abort("some single server assigner hasn't started yet" - + " when the bulk assigner timed out", null); - return false; - } - } - - int reassigningRegions = 0; - if (!failedPlans.isEmpty() && !server.isStopped()) { - reassigningRegions = reassignFailedPlans(); - } - assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned, - reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime)); - - if (LOG.isDebugEnabled()) { - long elapsedTime = System.currentTimeMillis() - startTime; - String status = "successfully"; - if (!regionSet.isEmpty()) { - status = "with " + regionSet.size() + " regions still in transition"; - } - LOG.debug("bulk assigning total " + regionCount + " regions to " - + serverCount + " servers, took " + elapsedTime + "ms, " + status); - } - return regionSet.isEmpty(); - } - - @Override - protected long getTimeoutOnRIT() { - // Guess timeout. Multiply the max number of regions on a server - // by how long we think one region takes opening. - Configuration conf = server.getConfiguration(); - long perRegionOpenTimeGuesstimate = - conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000); - int maxRegionsPerServer = 1; - for (List<HRegionInfo> regionList : bulkPlan.values()) { - int size = regionList.size(); - if (size > maxRegionsPerServer) { - maxRegionsPerServer = size; - } - } - long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer - + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000) - + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime", - 30000) * bulkPlan.size(); - LOG.debug("Timeout-on-RIT=" + timeout); - return timeout; - } - - @Override - protected UncaughtExceptionHandler getUncaughtExceptionHandler() { - return new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.warn("Assigning regions in " + t.getName(), e); - } - }; - } - - private int reassignFailedPlans() { - List<HRegionInfo> reassigningRegions = new ArrayList<>(); - for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) { - LOG.info("Failed assigning " + e.getValue().size() - + " regions to server " + e.getKey() + ", reassigning them"); - reassigningRegions.addAll(failedPlans.remove(e.getKey())); - } - RegionStates regionStates = assignmentManager.getRegionStates(); - for (HRegionInfo region : reassigningRegions) { - if (!regionStates.isRegionOnline(region)) { - assignmentManager.invokeAssign(region); - } - } - return reassigningRegions.size(); - } - - /** - * Manage bulk assigning to a server. - */ - static class SingleServerBulkAssigner implements Runnable { - private final ServerName regionserver; - private final List<HRegionInfo> regions; - private final AssignmentManager assignmentManager; - private final Map<ServerName, List<HRegionInfo>> failedPlans; - - SingleServerBulkAssigner(final ServerName regionserver, - final List<HRegionInfo> regions, final AssignmentManager am, - final Map<ServerName, List<HRegionInfo>> failedPlans) { - this.regionserver = regionserver; - this.regions = regions; - this.assignmentManager = am; - this.failedPlans = failedPlans; - } - - @Override - public void run() { - try { - if (!assignmentManager.assign(regionserver, regions)) { - failedPlans.put(regionserver, regions); - } - } catch (Throwable t) { - LOG.warn("Failed bulking assigning " + regions.size() - + " region(s) to " + regionserver.getServerName() - + ", and continue to bulk assign others", t); - failedPlans.put(regionserver, regions); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index a1cbe53..9812aaf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -65,8 +65,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.RegionStateListener; -import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; @@ -90,6 +88,10 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; @@ -110,15 +112,14 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.DispatchMergingRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; -import org.apache.hadoop.hbase.master.procedure.MergeTableRegionsProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; -import org.apache.hadoop.hbase.master.procedure.SplitTableRegionProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -332,7 +333,6 @@ public class HMaster extends HRegionServer implements MasterServices { private RegionNormalizerChore normalizerChore; private ClusterStatusChore clusterStatusChore; private ClusterStatusPublisher clusterStatusPublisherChore = null; - private PeriodicDoMetrics periodicDoMetricsChore = null; CatalogJanitor catalogJanitorChore; private ReplicationMetaCleaner replicationMetaCleaner; @@ -431,19 +431,6 @@ public class HMaster extends HRegionServer implements MasterServices { } } - private static class PeriodicDoMetrics extends ScheduledChore { - private final HMaster server; - public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) { - super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval); - this.server = server; - } - - @Override - protected void chore() { - server.doMetrics(); - } - } - /** * Initializes the HMaster. The steps are as follows: * <p> @@ -646,20 +633,6 @@ public class HMaster extends HRegionServer implements MasterServices { return MasterDumpServlet.class; } - /** - * Emit the HMaster metrics, such as region in transition metrics. - * Surrounding in a try block just to be sure metrics doesn't abort HMaster. - */ - private void doMetrics() { - try { - if (assignmentManager != null) { - assignmentManager.updateRegionsInTransitionMetrics(); - } - } catch (Throwable e) { - LOG.error("Couldn't update metrics: " + e.getMessage()); - } - } - MetricsMaster getMasterMetrics() { return metricsMaster; } @@ -682,8 +655,9 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker.start(); - this.assignmentManager = new AssignmentManager(this, serverManager, - this.balancer, this.service, this.metricsMaster, tableStateManager); + // Create Assignment Manager + this.assignmentManager = new AssignmentManager(this); + this.assignmentManager.start(); this.replicationManager = new ReplicationManager(conf, zooKeeper, this); @@ -867,10 +841,6 @@ public class HMaster extends HRegionServer implements MasterServices { this.catalogJanitorChore = new CatalogJanitor(this); getChoreService().scheduleChore(catalogJanitorChore); - // Do Metrics periodically - periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this); - getChoreService().scheduleChore(periodicDoMetricsChore); - status.setStatus("Starting cluster schema service"); initClusterSchemaService(); @@ -883,7 +853,8 @@ public class HMaster extends HRegionServer implements MasterServices { } status.markComplete("Initialization successful"); - LOG.info("Master has completed initialization"); + LOG.info(String.format("Master has completed initialization %.3fsec", + (System.currentTimeMillis() - masterActiveTime) / 1000.0f)); configurationManager.registerObserver(this.balancer); // Set master as 'initialized'. @@ -961,8 +932,8 @@ public class HMaster extends HRegionServer implements MasterServices { // Check zk for region servers that are up but didn't register for (ServerName sn: this.regionServerTracker.getOnlineServers()) { // The isServerOnline check is opportunistic, correctness is handled inside - if (!this.serverManager.isServerOnline(sn) - && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { + if (!this.serverManager.isServerOnline(sn) && + serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) { LOG.info("Registered server found up in zk but who has not yet reported in: " + sn); } } @@ -976,14 +947,13 @@ public class HMaster extends HRegionServer implements MasterServices { void initQuotaManager() throws IOException { MasterQuotaManager quotaManager = new MasterQuotaManager(this); - this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager); + this.assignmentManager.setRegionStateListener(quotaManager); quotaManager.start(); this.quotaManager = quotaManager; } boolean isCatalogJanitorEnabled() { - return catalogJanitorChore != null ? - catalogJanitorChore.getEnabled() : false; + return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false; } boolean isCleanerChoreEnabled() { @@ -1091,7 +1061,6 @@ public class HMaster extends HRegionServer implements MasterServices { @Override protected void sendShutdownInterrupt() { super.sendShutdownInterrupt(); - stopProcedureExecutor(); } @Override @@ -1116,15 +1085,20 @@ public class HMaster extends HRegionServer implements MasterServices { if (LOG.isDebugEnabled()) { LOG.debug("Stopping service threads"); } + // Clean up and close up shop if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); + if (this.activeMasterManager != null) this.activeMasterManager.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); + + stopProcedureExecutor(); + if (this.walManager != null) this.walManager.stop(); if (this.fileSystemManager != null) this.fileSystemManager.stop(); if (this.mpmHost != null) this.mpmHost.stop("server shutting down."); @@ -1150,16 +1124,20 @@ public class HMaster extends HRegionServer implements MasterServices { MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); procedureExecutor.start(numThreads, abortOnCorruption); + procEnv.getRemoteDispatcher().start(); } private void stopProcedureExecutor() { if (procedureExecutor != null) { configurationManager.deregisterObserver(procedureExecutor.getEnvironment()); + procedureExecutor.getEnvironment().getRemoteDispatcher().stop(); procedureExecutor.stop(); + procedureExecutor = null; } if (procedureStore != null) { procedureStore.stop(isAborted()); + procedureStore = null; } } @@ -1188,10 +1166,6 @@ public class HMaster extends HRegionServer implements MasterServices { if (this.mobCompactThread != null) { this.mobCompactThread.close(); } - - if (this.periodicDoMetricsChore != null) { - periodicDoMetricsChore.cancel(); - } } /** @@ -1249,7 +1223,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Sleep to next balance plan start time // But if there are zero regions in transition, it can skip sleep to speed up. while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime - && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() != 0) { + && this.assignmentManager.getRegionStates().hasRegionsInTransition()) { try { Thread.sleep(100); } catch (InterruptedException ie) { @@ -1260,7 +1234,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Throttling by max number regions in transition while (!interrupted && maxRegionsInTransition > 0 - && this.assignmentManager.getRegionStates().getRegionsInTransitionCount() + && this.assignmentManager.getRegionStates().getRegionsInTransition().size() >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) { try { // sleep if the number of regions in transition exceeds the limit @@ -1293,13 +1267,12 @@ public class HMaster extends HRegionServer implements MasterServices { synchronized (this.balancer) { // If balance not true, don't run balancer. if (!this.loadBalancerTracker.isBalancerOn()) return false; - // Only allow one balance run at at time. - if (this.assignmentManager.getRegionStates().isRegionsInTransition()) { - Set<RegionState> regionsInTransition = - this.assignmentManager.getRegionStates().getRegionsInTransition(); + // Only allow one balance run at at time. + if (this.assignmentManager.hasRegionsInTransition()) { + List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition(); // if hbase:meta region is in transition, result of assignment cannot be recorded // ignore the force flag in that case - boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition(); + boolean metaInTransition = assignmentManager.isMetaRegionInTransition(); String prefix = force && !metaInTransition ? "R" : "Not r"; LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() + " region(s) in transition: " + org.apache.commons.lang.StringUtils. @@ -1332,7 +1305,7 @@ public class HMaster extends HRegionServer implements MasterServices { //Give the balancer the current cluster state. this.balancer.setClusterStatus(getClusterStatus()); this.balancer.setClusterLoad( - this.assignmentManager.getRegionStates().getAssignmentsByTable(true)); + this.assignmentManager.getRegionStates().getAssignmentsByTable()); for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) { List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue()); @@ -1351,7 +1324,7 @@ public class HMaster extends HRegionServer implements MasterServices { for (RegionPlan plan: plans) { LOG.info("balance " + plan); //TODO: bulk assign - this.assignmentManager.balance(plan); + this.assignmentManager.moveAsync(plan); rpCount++; balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition, @@ -1467,6 +1440,59 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override + public long dispatchMergingRegions( + final HRegionInfo regionInfoA, + final HRegionInfo regionInfoB, + final boolean forcible, + final long nonceGroup, + final long nonce) throws IOException { + checkInitialized(); + + TableName tableName = regionInfoA.getTable(); + if (tableName == null || regionInfoB.getTable() == null) { + throw new UnknownRegionException ("Can't merge regions without table associated"); + } + + if (!tableName.equals(regionInfoB.getTable())) { + throw new IOException ("Cannot merge regions from two different tables"); + } + + if (regionInfoA.compareTo(regionInfoB) == 0) { + throw new MergeRegionException( + "Cannot merge a region to itself " + regionInfoA + ", " + regionInfoB); + } + + final HRegionInfo [] regionsToMerge = new HRegionInfo[2]; + regionsToMerge [0] = regionInfoA; + regionsToMerge [1] = regionInfoB; + + return MasterProcedureUtil.submitProcedure( + new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() throws IOException { + MasterCoprocessorHost mcph = getMaster().getMasterCoprocessorHost(); + if (mcph != null) { + mcph.preDispatchMerge(regionInfoA, regionInfoB); + } + + LOG.info(getClientIdAuditPrefix() + " Dispatch merge regions " + + regionsToMerge[0].getEncodedName() + " and " + regionsToMerge[1].getEncodedName()); + + submitProcedure(new DispatchMergingRegionsProcedure( + procedureExecutor.getEnvironment(), tableName, regionsToMerge, forcible)); + if (mcph != null) { + mcph.postDispatchMerge(regionInfoA, regionInfoB); + } + } + + @Override + protected String getDescription() { + return "DispatchMergingRegionsProcedure"; + } + }); + } + + @Override public long mergeRegions( final HRegionInfo[] regionsToMerge, final boolean forcible, @@ -1515,24 +1541,19 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public long splitRegion( - final HRegionInfo regionInfo, - final byte[] splitRow, - final long nonceGroup, - final long nonce) throws IOException { + public long splitRegion(final HRegionInfo regionInfo, final byte[] splitRow, + final long nonceGroup, final long nonce) + throws IOException { checkInitialized(); - return MasterProcedureUtil.submitProcedure( new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { @Override protected void run() throws IOException { getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow); - - LOG.info(getClientIdAuditPrefix() + " Split region " + regionInfo); + LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString()); // Execute the operation asynchronously - submitProcedure(new SplitTableRegionProcedure(procedureExecutor.getEnvironment(), - regionInfo, splitRow)); + submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow)); } @Override @@ -1607,7 +1628,7 @@ public class HMaster extends HRegionServer implements MasterServices { serverManager.sendRegionWarmup(rp.getDestination(), hri); LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer"); - this.assignmentManager.balance(rp); + this.assignmentManager.moveAsync(rp); if (this.cpHost != null) { this.cpHost.postMove(hri, rp.getSource(), rp.getDestination()); } @@ -2378,8 +2399,9 @@ public class HMaster extends HRegionServer implements MasterServices { String clusterId = fileSystemManager != null ? fileSystemManager.getClusterId().toString() : null; - Set<RegionState> regionsInTransition = assignmentManager != null ? - assignmentManager.getRegionStates().getRegionsInTransition() : null; + List<RegionState> regionsInTransition = assignmentManager != null ? + assignmentManager.getRegionStates().getRegionsStateInTransition() : null; + String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null; boolean balancerOn = loadBalancerTracker != null ? loadBalancerTracker.isBalancerOn() : false; @@ -3114,6 +3136,7 @@ public class HMaster extends HRegionServer implements MasterServices { * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType} * @return The state of the switch */ + @Override public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) { if (null == splitOrMergeTracker || isInMaintenanceMode()) { return false; http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 277dcc8..0e86925 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -44,7 +44,9 @@ import edu.umd.cs.findbugs.annotations.Nullable; * <p>On cluster startup, bulk assignment can be used to determine * locations for all Regions in a cluster. * - * <p>This classes produces plans for the {@link AssignmentManager} to execute. + * <p>This classes produces plans for the + * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} + * to execute. */ @InterfaceAudience.Private public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObserver { http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 8a7a387..534a507 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -789,6 +789,28 @@ public class MasterCoprocessorHost }); } + public void preDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.preDispatchMerge(ctx, regionInfoA, regionInfoB); + } + }); + } + + public void postDispatchMerge(final HRegionInfo regionInfoA, final HRegionInfo regionInfoB) + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(MasterObserver oserver, ObserverContext<MasterCoprocessorEnvironment> ctx) + throws IOException { + oserver.postDispatchMerge(ctx, regionInfoA, regionInfoB); + } + }); + } + public void preMergeRegions(final HRegionInfo[] regionsToMerge) throws IOException { execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java index a921ab5..a48444c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java @@ -24,7 +24,6 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.util.Date; import java.util.Map; -import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -33,6 +32,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; import org.apache.hadoop.hbase.monitoring.LogMonitoring; import org.apache.hadoop.hbase.monitoring.StateDumpServlet; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -117,9 +118,8 @@ public class MasterDumpServlet extends StateDumpServlet { return; } - Set<RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition(); - for (RegionState rs : regionsInTransition) { - String rid = rs.getRegion().getRegionNameAsString(); + for (RegionStateNode rs : am.getRegionsInTransition()) { + String rid = rs.getRegionInfo().getEncodedName(); out.println("Region " + rid + ": " + rs.toDescriptiveString()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java index 1988e2d..fe93ce0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -151,7 +152,9 @@ public class MasterMetaBootstrap { // Work on meta region int assigned = 0; - long timeout = master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000); + // TODO: Unimplemented + // long timeout = + // master.getConfiguration().getLong("hbase.catalog.verification.timeout", 1000); if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { status.setStatus("Assigning hbase:meta region"); } else { @@ -160,37 +163,10 @@ public class MasterMetaBootstrap { // Get current meta state from zk. RegionState metaState = MetaTableLocator.getMetaRegionState(master.getZooKeeper(), replicaId); - HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, - replicaId); - RegionStates regionStates = assignmentManager.getRegionStates(); - regionStates.createRegionState(hri, metaState.getState(), - metaState.getServerName(), null); - - if (!metaState.isOpened() || !master.getMetaTableLocator().verifyMetaRegionLocation( - master.getClusterConnection(), master.getZooKeeper(), timeout, replicaId)) { - ServerName currentMetaServer = metaState.getServerName(); - if (master.getServerManager().isServerOnline(currentMetaServer)) { - if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { - LOG.info("Meta was in transition on " + currentMetaServer); - } else { - LOG.info("Meta with replicaId " + replicaId + " was in transition on " + - currentMetaServer); - } - assignmentManager.processRegionsInTransition(Collections.singletonList(metaState)); - } else { - if (currentMetaServer != null) { - if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { - splitMetaLogBeforeAssignment(currentMetaServer); - regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO); - previouslyFailedMetaRSs.add(currentMetaServer); - } - } - LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId + - " it was on " + currentMetaServer); - assignmentManager.assignMeta(hri); - } - assigned++; - } + LOG.debug("meta state from zookeeper: " + metaState); + HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica( + HRegionInfo.FIRST_META_REGIONINFO, replicaId); + assignmentManager.assignMeta(hri, metaState.getServerName()); if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) { // TODO: should we prevent from using state manager before meta was initialized? @@ -206,7 +182,6 @@ public class MasterMetaBootstrap { master.getMasterWalManager().splitMetaLog(previouslyFailedMetaRSs); } - assignmentManager.setEnabledTable(TableName.META_TABLE_NAME); master.getTableStateManager().start(); // Make sure a hbase:meta location is set. We need to enable SSH here since @@ -214,7 +189,7 @@ public class MasterMetaBootstrap { // by SSH so that system tables can be assigned. // No need to wait for meta is assigned = 0 when meta is just verified. if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0); - LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location=" + LOG.info("hbase:meta with replicaId " + replicaId + ", location=" + master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper(), replicaId)); status.setStatus("META assigned."); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index f86f800..6b13c12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -32,9 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ProcedureInfo; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; @@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; @@ -51,6 +50,7 @@ import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; @@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.*; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest; @@ -86,129 +85,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockH import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteSnapshotResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DrainRegionServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableStateResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsCleanerChoreEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RemoveDrainFromRegionServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.*; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse.Capability; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; @@ -216,13 +94,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; @@ -389,7 +264,11 @@ public class MasterRpcServices extends RSRpcServices ClusterStatusProtos.ServerLoad sl = request.getLoad(); ServerName serverName = ProtobufUtil.toServerName(request.getServer()); ServerLoad oldLoad = master.getServerManager().getLoad(serverName); - master.getServerManager().regionServerReport(serverName, new ServerLoad(sl)); + ServerLoad newLoad = new ServerLoad(sl); + master.getServerManager().regionServerReport(serverName, newLoad); + int version = VersionInfoUtil.getCurrentClientVersionNumber(); + master.getAssignmentManager().reportOnlineRegions(serverName, + version, newLoad.getRegionsLoad().keySet()); if (sl != null && master.metricsMaster != null) { // Up our metrics. master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() @@ -462,25 +341,25 @@ public class MasterRpcServices extends RSRpcServices public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req) throws ServiceException { try { - final byte [] regionName = req.getRegion().getValue().toByteArray(); - RegionSpecifierType type = req.getRegion().getType(); - AssignRegionResponse arr = AssignRegionResponse.newBuilder().build(); - master.checkInitialized(); + + final RegionSpecifierType type = req.getRegion().getType(); if (type != RegionSpecifierType.REGION_NAME) { LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME + " actual: " + type); } - RegionStates regionStates = master.getAssignmentManager().getRegionStates(); - HRegionInfo regionInfo = regionStates.getRegionInfo(regionName); - if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName)); + + final byte[] regionName = req.getRegion().getValue().toByteArray(); + final HRegionInfo regionInfo = master.getAssignmentManager().getRegionInfo(regionName); + if (regionInfo == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + + final AssignRegionResponse arr = AssignRegionResponse.newBuilder().build(); if (master.cpHost != null) { if (master.cpHost.preAssign(regionInfo)) { return arr; } } - LOG.info(master.getClientIdAuditPrefix() - + " assign " + regionInfo.getRegionNameAsString()); + LOG.info(master.getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString()); master.getAssignmentManager().assign(regionInfo, true); if (master.cpHost != null) { master.cpHost.postAssign(regionInfo); @@ -491,6 +370,7 @@ public class MasterRpcServices extends RSRpcServices } } + @Override public BalanceResponse balance(RpcController controller, BalanceRequest request) throws ServiceException { @@ -710,8 +590,7 @@ public class MasterRpcServices extends RSRpcServices } @Override - public SplitTableRegionResponse splitRegion( - final RpcController controller, + public SplitTableRegionResponse splitRegion(final RpcController controller, final SplitTableRegionRequest request) throws ServiceException { try { long procId = master.splitRegion( @@ -1281,24 +1160,24 @@ public class MasterRpcServices extends RSRpcServices @Override public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request) throws ServiceException { - final byte [] regionName = request.getRegion().getValue().toByteArray(); - RegionSpecifierType type = request.getRegion().getType(); - if (type != RegionSpecifierType.REGION_NAME) { - LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME - + " actual: " + type); - } - try { master.checkInitialized(); - Pair<HRegionInfo, ServerName> pair = - MetaTableAccessor.getRegion(master.getConnection(), regionName); - if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); - HRegionInfo hri = pair.getFirst(); + + final RegionSpecifierType type = request.getRegion().getType(); + if (type != RegionSpecifierType.REGION_NAME) { + LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME + + " actual: " + type); + } + + final byte[] regionName = request.getRegion().getValue().toByteArray(); + final HRegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName); + if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); + if (master.cpHost != null) { master.cpHost.preRegionOffline(hri); } LOG.info(master.getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString()); - master.getAssignmentManager().regionOffline(hri); + master.getAssignmentManager().offlineRegion(hri); if (master.cpHost != null) { master.cpHost.postRegionOffline(hri); } @@ -1439,28 +1318,18 @@ public class MasterRpcServices extends RSRpcServices public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req) throws ServiceException { try { - final byte [] regionName = req.getRegion().getValue().toByteArray(); - RegionSpecifierType type = req.getRegion().getType(); - final boolean force = req.getForce(); - UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build(); - master.checkInitialized(); + final RegionSpecifierType type = req.getRegion().getType(); if (type != RegionSpecifierType.REGION_NAME) { LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME + " actual: " + type); } - Pair<HRegionInfo, ServerName> pair = - MetaTableAccessor.getRegion(master.getConnection(), regionName); - if (Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(),regionName)) { - pair = new Pair<>(HRegionInfo.FIRST_META_REGIONINFO, - master.getMetaTableLocator().getMetaRegionLocation(master.getZooKeeper())); - } - if (pair == null) { - throw new UnknownRegionException(Bytes.toString(regionName)); - } + final byte[] regionName = req.getRegion().getValue().toByteArray(); + final HRegionInfo hri = master.getAssignmentManager().getRegionInfo(regionName); + if (hri == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName)); - if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName)); - HRegionInfo hri = pair.getFirst(); + final boolean force = req.getForce(); + final UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build(); if (master.cpHost != null) { if (master.cpHost.preUnassign(hri, force)) { return urr; @@ -1472,7 +1341,6 @@ public class MasterRpcServices extends RSRpcServices if (master.cpHost != null) { master.cpHost.postUnassign(hri, force); } - return urr; } catch (IOException ioe) { throw new ServiceException(ioe); @@ -1484,26 +1352,7 @@ public class MasterRpcServices extends RSRpcServices ReportRegionStateTransitionRequest req) throws ServiceException { try { master.checkServiceStarted(); - RegionStateTransition rt = req.getTransition(0); - RegionStates regionStates = master.getAssignmentManager().getRegionStates(); - for (RegionInfo ri : rt.getRegionInfoList()) { - TableName tableName = ProtobufUtil.toTableName(ri.getTableName()); - if (!(TableName.META_TABLE_NAME.equals(tableName) - && regionStates.getRegionState(HRegionInfo.FIRST_META_REGIONINFO) != null) - && !master.getAssignmentManager().isFailoverCleanupDone()) { - // Meta region is assigned before master finishes the - // failover cleanup. So no need this check for it - throw new PleaseHoldException("Master is rebuilding user regions"); - } - } - ServerName sn = ProtobufUtil.toServerName(req.getServer()); - String error = master.getAssignmentManager().onRegionTransition(sn, rt); - ReportRegionStateTransitionResponse.Builder rrtr = - ReportRegionStateTransitionResponse.newBuilder(); - if (error != null) { - rrtr.setErrorMessage(error); - } - return rrtr.build(); + return master.getAssignmentManager().reportRegionStateTransition(req); } catch (IOException ioe) { throw new ServiceException(ioe); } @@ -2006,4 +1855,34 @@ public class MasterRpcServices extends RSRpcServices throw new ServiceException(e); } } -} + + @Override + public DispatchMergingRegionsResponse dispatchMergingRegions(RpcController controller, + DispatchMergingRegionsRequest request) throws ServiceException { + final byte[] encodedNameOfRegionA = request.getRegionA().getValue().toByteArray(); + final byte[] encodedNameOfRegionB = request.getRegionB().getValue().toByteArray(); + if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME || + request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) { + LOG.warn("mergeRegions specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME + + " actual: region_a=" + + request.getRegionA().getType() + ", region_b=" + + request.getRegionB().getType()); + } + RegionStates regionStates = master.getAssignmentManager().getRegionStates(); + RegionState regionStateA = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionA)); + RegionState regionStateB = regionStates.getRegionState(Bytes.toString(encodedNameOfRegionB)); + if (regionStateA == null || regionStateB == null) { + throw new ServiceException(new UnknownRegionException( + Bytes.toStringBinary(regionStateA == null? encodedNameOfRegionA: encodedNameOfRegionB))); + } + final HRegionInfo regionInfoA = regionStateA.getRegion(); + final HRegionInfo regionInfoB = regionStateB.getRegion(); + try { + long procId = master.dispatchMergingRegions(regionInfoA, regionInfoB, request.getForcible(), + request.getNonceGroup(), request.getNonce()); + return DispatchMergingRegionsResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 66758f8..5a45fcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; @@ -266,6 +268,23 @@ public interface MasterServices extends Server { throws IOException; /** + * Merge two regions. The real implementation is on the regionserver, master + * just move the regions together and send MERGE RPC to regionserver + * @param region_a region to merge + * @param region_b region to merge + * @param forcible true if do a compulsory merge, otherwise we will only merge + * two adjacent regions + * @return procedure Id + * @throws IOException + */ + long dispatchMergingRegions( + final HRegionInfo region_a, + final HRegionInfo region_b, + final boolean forcible, + final long nonceGroup, + final long nonce) throws IOException; + + /** * Merge regions in a table. * @param regionsToMerge daughter regions to merge * @param forcible whether to force to merge even two regions are not adjacent @@ -394,6 +413,8 @@ public interface MasterServices extends Server { */ boolean isStopping(); + boolean isSplitOrMergeEnabled(MasterSwitchType switchType); + /** * @return Favored Nodes Manager */ http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 27aca94..9ac5ba2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -218,9 +219,7 @@ public class MasterWalManager { } public void splitLog(final ServerName serverName) throws IOException { - Set<ServerName> serverNames = new HashSet<>(); - serverNames.add(serverName); - splitLog(serverNames); + splitLog(Collections.<ServerName>singleton(serverName)); } /** @@ -228,9 +227,7 @@ public class MasterWalManager { * @param serverName logs belonging to this server will be split */ public void splitMetaLog(final ServerName serverName) throws IOException { - Set<ServerName> serverNames = new HashSet<>(); - serverNames.add(serverName); - splitMetaLog(serverNames); + splitMetaLog(Collections.<ServerName>singleton(serverName)); } /** @@ -342,4 +339,4 @@ public class MasterWalManager { public RecoveryMode getLogRecoveryMode() { return this.splitLogManager.getRecoveryMode(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8faab93a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java index 40e79ae..c7ce9a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManager.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; public class MetricsAssignmentManager { - private final MetricsAssignmentManagerSource assignmentManagerSource; public MetricsAssignmentManager() { @@ -33,19 +32,11 @@ public class MetricsAssignmentManager { return assignmentManagerSource; } - public void updateAssignmentTime(long time) { - assignmentManagerSource.updateAssignmentTime(time); - } - - public void updateBulkAssignTime(long time) { - assignmentManagerSource.updateBulkAssignTime(time); - } - /** * set new value for number of regions in transition. * @param ritCount */ - public void updateRITCount(int ritCount) { + public void updateRITCount(final int ritCount) { assignmentManagerSource.setRIT(ritCount); } @@ -54,14 +45,15 @@ public class MetricsAssignmentManager { * as defined by the property rit.metrics.threshold.time. * @param ritCountOverThreshold */ - public void updateRITCountOverThreshold(int ritCountOverThreshold) { + public void updateRITCountOverThreshold(final int ritCountOverThreshold) { assignmentManagerSource.setRITCountOverThreshold(ritCountOverThreshold); } + /** * update the timestamp for oldest region in transition metrics. * @param timestamp */ - public void updateRITOldestAge(long timestamp) { + public void updateRITOldestAge(final long timestamp) { assignmentManagerSource.setRITOldestAge(timestamp); } @@ -72,4 +64,27 @@ public class MetricsAssignmentManager { public void updateRitDuration(long duration) { assignmentManagerSource.updateRitDuration(duration); } + + /* + * Increment the count of assignment operation (assign/unassign). + */ + public void incrementOperationCounter() { + assignmentManagerSource.incrementOperationCounter(); + } + + /** + * Add the time took to perform the last assign operation + * @param time + */ + public void updateAssignTime(final long time) { + assignmentManagerSource.updateAssignTime(time); + } + + /** + * Add the time took to perform the last unassign operation + * @param time + */ + public void updateUnassignTime(final long time) { + assignmentManagerSource.updateUnassignTime(time); + } }
