http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java new file mode 100644 index 0000000..126718a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/UnassignProcedure.java @@ -0,0 +1,247 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerCrashException; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.RegionCloseOperation; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionTransitionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.UnassignRegionStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; + + +/** + * Procedure that describe the unassignment of a single region. + * There can only be one RegionTransitionProcedure per region running at the time, + * since each procedure takes a lock on the region. + * + * <p>The Unassign starts by placing a "close region" request in the Remote Dispatcher + * queue, and the procedure will then go into a "waiting state". + * The Remote Dispatcher will batch the various requests for that server and + * they will be sent to the RS for execution. + * The RS will complete the open operation by calling master.reportRegionStateTransition(). + * The AM will intercept the transition report, and notify the procedure. + * The procedure will finish the unassign by publishing its new state on meta + * or it will retry the unassign. + */ [email protected] +public class UnassignProcedure extends RegionTransitionProcedure { + private static final Log LOG = LogFactory.getLog(UnassignProcedure.class); + + /** + * Where to send the unassign RPC. + */ + protected volatile ServerName destinationServer; + + private final AtomicBoolean serverCrashed = new AtomicBoolean(false); + + // TODO: should this be in a reassign procedure? + // ...and keep unassign for 'disable' case? + private boolean force; + + public UnassignProcedure() { + // Required by the Procedure framework to create the procedure on replay + super(); + } + + public UnassignProcedure(final HRegionInfo regionInfo, + final ServerName destinationServer, final boolean force) { + super(regionInfo); + this.destinationServer = destinationServer; + this.force = force; + + // we don't need REGION_TRANSITION_QUEUE, we jump directly to sending the request + setTransitionState(RegionTransitionState.REGION_TRANSITION_DISPATCH); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_UNASSIGN; + } + + @Override + protected boolean isRollbackSupported(final RegionTransitionState state) { + switch (state) { + case REGION_TRANSITION_QUEUE: + case REGION_TRANSITION_DISPATCH: + return true; + default: + return false; + } + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + UnassignRegionStateData.Builder state = UnassignRegionStateData.newBuilder() + .setTransitionState(getTransitionState()) + .setDestinationServer(ProtobufUtil.toServerName(destinationServer)) + .setRegionInfo(HRegionInfo.convert(getRegionInfo())); + if (force) { + state.setForce(true); + } + state.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + final UnassignRegionStateData state = UnassignRegionStateData.parseDelimitedFrom(stream); + setTransitionState(state.getTransitionState()); + setRegionInfo(HRegionInfo.convert(state.getRegionInfo())); + force = state.getForce(); + if (state.hasDestinationServer()) { + this.destinationServer = ProtobufUtil.toServerName(state.getDestinationServer()); + } + } + + @Override + protected boolean startTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) { + // nothing to do here. we skip the step in the constructor + // by jumping to REGION_TRANSITION_DISPATCH + throw new UnsupportedOperationException(); + } + + @Override + protected boolean updateTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) + throws IOException { + // if the region is already closed or offline we can't do much... + if (regionNode.isInState(State.CLOSED, State.OFFLINE)) { + LOG.info("Not unassigned " + this + "; " + regionNode.toShortString()); + return false; + } + + // if the server is down, mark the operation as complete + if (serverCrashed.get() || !isServerOnline(env, regionNode)) { + LOG.info("Server already down: " + this + "; " + regionNode.toShortString()); + return false; + } + + // if we haven't started the operation yet, we can abort + if (aborted.get() && regionNode.isInState(State.OPEN)) { + setAbortFailure(getClass().getSimpleName(), "abort requested"); + return false; + } + + // Mark the region as CLOSING. + env.getAssignmentManager().markRegionAsClosing(regionNode); + + // Add the close region operation the the server dispatch queue. + if (!addToRemoteDispatcher(env, regionNode.getRegionLocation())) { + // If addToRemoteDispatcher fails, it calls #remoteCallFailed which + // does all cleanup. + } + + // We always return true, even if we fail dispatch because addToRemoteDispatcher + // failure processing sets state back to REGION_TRANSITION_QUEUE so we try again; + // i.e. return true to keep the Procedure running; it has been reset to startover. + return true; + } + + @Override + protected void finishTransition(final MasterProcedureEnv env, final RegionStateNode regionNode) + throws IOException { + env.getAssignmentManager().markRegionAsClosed(regionNode); + } + + @Override + public RemoteOperation remoteCallBuild(final MasterProcedureEnv env, final ServerName serverName) { + assert serverName.equals(getRegionState(env).getRegionLocation()); + return new RegionCloseOperation(this, getRegionInfo(), destinationServer); + } + + @Override + protected void reportTransition(final MasterProcedureEnv env, final RegionStateNode regionNode, + final TransitionCode code, final long seqId) throws UnexpectedStateException { + switch (code) { + case CLOSED: + setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH); + break; + default: + throw new UnexpectedStateException(String.format( + "Received report unexpected transition state=%s for region=%s server=%s, expected CLOSED.", + code, regionNode.getRegionInfo(), regionNode.getRegionLocation())); + } + } + + @Override + protected void remoteCallFailed(final MasterProcedureEnv env, final RegionStateNode regionNode, + final IOException exception) { + // TODO: Is there on-going rpc to cleanup? + if (exception instanceof ServerCrashException) { + // This exception comes from ServerCrashProcedure after log splitting. + // It is ok to let this procedure go on to complete close now. + // This will release lock on this region so the subsequent assign can succeed. + try { + reportTransition(env, regionNode, TransitionCode.CLOSED, HConstants.NO_SEQNUM); + } catch (UnexpectedStateException e) { + // Should never happen. + throw new RuntimeException(e); + } + } else if (exception instanceof RegionServerAbortedException || + exception instanceof RegionServerStoppedException || + exception instanceof ServerNotRunningYetException) { + // TODO + // RS is aborting, we cannot offline the region since the region may need to do WAL + // recovery. Until we see the RS expiration, we should retry. + LOG.info("Ignoring; waiting on ServerCrashProcedure", exception); + // serverCrashed.set(true); + } else if (exception instanceof NotServingRegionException) { + LOG.info("IS THIS OK? ANY LOGS TO REPLAY; ACTING AS THOUGH ALL GOOD " + regionNode, exception); + setTransitionState(RegionTransitionState.REGION_TRANSITION_FINISH); + } else { + // TODO: kill the server in case we get an exception we are not able to handle + LOG.warn("Killing server; unexpected exception; " + + this + "; " + regionNode.toShortString() + + " exception=" + exception); + env.getMasterServices().getServerManager().expireServer(regionNode.getRegionLocation()); + serverCrashed.set(true); + } + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + super.toStringClassDetails(sb); + sb.append(", server=").append(this.destinationServer); + } + + @Override + public ServerName getServer(final MasterProcedureEnv env) { + return this.destinationServer; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java new file mode 100644 index 0000000..cb3861a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/Util.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; + +/** + * Utility for this assignment package only. + */ [email protected] +class Util { + private Util() {} + + /** + * Raw call to remote regionserver to get info on a particular region. + * @throws IOException Let it out so can report this IOE as reason for failure + */ + static GetRegionInfoResponse getRegionInfoResponse(final MasterProcedureEnv env, + final ServerName regionLocation, final HRegionInfo hri) + throws IOException { + // TODO: There is no timeout on this controller. Set one! + HBaseRpcController controller = env.getMasterServices().getClusterConnection(). + getRpcControllerFactory().newController(); + final AdminService.BlockingInterface admin = + env.getMasterServices().getClusterConnection().getAdmin(regionLocation); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest(hri.getRegionName()); + try { + return admin.getRegionInfo(controller, request); + } catch (ServiceException e) { + throw ProtobufUtil.handleRemoteException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 6410375..a494ecc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1,4 +1,4 @@ -/** + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -62,9 +62,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; /** - * The base class for load balancers. It provides functions used by - * {@link org.apache.hadoop.hbase.master.AssignmentManager} to assign regions in the edge cases. - * It doesn't provide an implementation of the actual balancing algorithm. + * The base class for load balancers. It provides the the functions used to by + * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions + * in the edge cases. It doesn't provide an implementation of the + * actual balancing algorithm. + * */ public abstract class BaseLoadBalancer implements LoadBalancer { protected static final int MIN_SERVER_BALANCE = 2; @@ -202,7 +204,15 @@ public abstract class BaseLoadBalancer implements LoadBalancer { // Use servername and port as there can be dead servers in this list. We want everything with // a matching hostname and port to have the same index. for (ServerName sn : clusterState.keySet()) { - if (serversToIndex.get(sn.getHostAndPort()) == null) { + if (sn == null) { + LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + + "skipping; unassigned regions?"); + if (LOG.isTraceEnabled()) { + LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); + } + continue; + } + if (serversToIndex.get(sn.getAddress().toString()) == null) { serversToIndex.put(sn.getHostAndPort(), numServers++); } if (!hostsToIndex.containsKey(sn.getHostname())) { @@ -257,6 +267,10 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) { + if (entry.getKey() == null) { + LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); + continue; + } int serverIndex = serversToIndex.get(entry.getKey().getHostAndPort()); // keep the servername if this is the first server name for this hostname @@ -585,8 +599,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { /** * Return true if the placement of region on server would lower the availability * of the region in question - * @param server - * @param region * @return true or false */ boolean wouldLowerAvailability(HRegionInfo regionInfo, ServerName serverName) { @@ -899,8 +911,11 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } if (leastLoadedServerIndex != -1) { - LOG.debug("Pick the least loaded server " + servers[leastLoadedServerIndex].getHostname() - + " with better locality for region " + regions[region]); + if (LOG.isTraceEnabled()) { + LOG.trace("Pick the least loaded server " + + servers[leastLoadedServerIndex].getHostname() + + " with better locality for region " + regions[region].getShortNameToLog()); + } } return leastLoadedServerIndex; } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java index fd98c9c..a8e22ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -469,6 +469,10 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements } } + public synchronized List<ServerName> getFavoredNodes(HRegionInfo regionInfo) { + return this.fnm.getFavoredNodes(regionInfo); + } + /* * Generate Favored Nodes for daughters during region split. * @@ -709,7 +713,12 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements // No favored nodes, lets unassign. LOG.warn("Region not on favored nodes, unassign. Region: " + hri + " current: " + current + " favored nodes: " + favoredNodes); - this.services.getAssignmentManager().unassign(hri); + try { + this.services.getAssignmentManager().unassign(hri); + } catch (IOException e) { + LOG.warn("Failed unassign", e); + continue; + } RegionPlan rp = new RegionPlan(hri, null, null); regionPlans.add(rp); misplacedRegions++; http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java index f7e166d..907e745 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionLocationFinder.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -39,9 +38,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -149,19 +147,15 @@ class RegionLocationFinder { if (services == null) { return false; } - AssignmentManager am = services.getAssignmentManager(); + final AssignmentManager am = services.getAssignmentManager(); if (am == null) { return false; } - RegionStates regionStates = am.getRegionStates(); - if (regionStates == null) { - return false; - } - Set<HRegionInfo> regions = regionStates.getRegionAssignments().keySet(); + // TODO: Should this refresh all the regions or only the ones assigned? boolean includesUserTables = false; - for (final HRegionInfo hri : regions) { + for (final HRegionInfo hri : am.getAssignedRegions()) { cache.refresh(hri); includesUserTables = includesUserTables || !hri.isSystemTable(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java index 7e8d696..818156d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -20,28 +20,27 @@ package org.apache.hadoop.hbase.master.balancer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Random; import java.util.TreeMap; -import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.util.Pair; import com.google.common.collect.MinMaxPriorityQueue; -import org.apache.hadoop.hbase.util.Pair; /** * Makes decisions about the placement and movement of Regions across @@ -54,7 +53,7 @@ import org.apache.hadoop.hbase.util.Pair; * locations for all Regions in a cluster. * * <p>This classes produces plans for the - * {@link org.apache.hadoop.hbase.master.AssignmentManager} to execute. + * {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to execute. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class SimpleLoadBalancer extends BaseLoadBalancer { http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 53db1f2..4b96bc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -293,9 +293,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { if (total <= 0 || sumMultiplier <= 0 || (sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance)) { - LOG.info("Skipping load balancing because balanced cluster; " + "total cost is " + total + if (LOG.isTraceEnabled()) { + LOG.trace("Skipping load balancing because balanced cluster; " + "total cost is " + total + ", sum multiplier is " + sumMultiplier + " min cost which need balance is " + minCostNeedBalance); + } return false; } return true; @@ -1153,11 +1155,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { stats = new double[cluster.numServers]; } - for (int i =0; i < cluster.numServers; i++) { + for (int i = 0; i < cluster.numServers; i++) { stats[i] = 0; for (int regionIdx : cluster.regionsPerServer[i]) { if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) { - stats[i] ++; + stats[i]++; } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java index 512f7e2..edbba83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/locking/LockProcedure.java @@ -232,7 +232,8 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv> } @Override - protected Procedure<?>[] execute(final MasterProcedureEnv env) throws ProcedureSuspendedException { + protected Procedure<MasterProcedureEnv>[] execute(final MasterProcedureEnv env) + throws ProcedureSuspendedException { // Local master locks don't store any state, so on recovery, simply finish this procedure // immediately. if (recoveredMasterLock) return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java index 03fdaef..6ebadb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineNamespaceProcedure.java @@ -52,9 +52,8 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState> @Override public void toStringClassDetails(final StringBuilder sb) { sb.append(getClass().getSimpleName()); - sb.append(" (namespace="); + sb.append(", namespace="); sb.append(getNamespaceName()); - sb.append(")"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java new file mode 100644 index 0000000..41502d4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineRegionProcedure.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +/** + * Base class for all the Region procedures that want to use a StateMachine. + * It provides some basic helpers like basic locking, sync latch, and toStringClassDetails(). + * Defaults to holding the lock for the life of the procedure. + */ [email protected] +public abstract class AbstractStateMachineRegionProcedure<TState> + extends AbstractStateMachineTableProcedure<TState> { + private HRegionInfo hri; + private volatile boolean lock = false; + + public AbstractStateMachineRegionProcedure(final MasterProcedureEnv env, + final HRegionInfo hri) { + super(env); + this.hri = hri; + } + + public AbstractStateMachineRegionProcedure() { + // Required by the Procedure framework to create the procedure on replay + super(); + } + + /** + * @return The HRegionInfo of the region we are operating on. + */ + protected HRegionInfo getRegion() { + return this.hri; + } + + /** + * Used when deserializing. Otherwise, DON'T TOUCH IT! + */ + protected void setRegion(final HRegionInfo hri) { + this.hri = hri; + } + + @Override + public TableName getTableName() { + return getRegion().getTable(); + } + + @Override + public abstract TableOperationType getTableOperationType(); + + @Override + public void toStringClassDetails(final StringBuilder sb) { + super.toStringClassDetails(sb); + sb.append(", region=").append(getRegion().getShortNameToLog()); + } + + /** + * Check whether a table is modifiable - exists and either offline or online with config set + * @param env MasterProcedureEnv + * @throws IOException + */ + protected void checkTableModifiable(final MasterProcedureEnv env) throws IOException { + // Checks whether the table exists + if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), getTableName())) { + throw new TableNotFoundException(getTableName()); + } + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; + if (env.getProcedureScheduler().waitRegions(this, getTableName(), getRegion())) { + return LockState.LOCK_EVENT_WAIT; + } + this.lock = true; + return LockState.LOCK_ACQUIRED; + } + + protected void releaseLock(final MasterProcedureEnv env) { + this.lock = false; + env.getProcedureScheduler().wakeRegions(this, getTableName(), getRegion()); + } + + @Override + protected boolean hasLock(final MasterProcedureEnv env) { + return this.lock; + } + + protected void setFailure(Throwable cause) { + super.setFailure(getClass().getSimpleName(), cause); + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + HRegionInfo.convert(getRegion()).writeDelimitedTo(stream); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + this.hri = HRegionInfo.convert(HBaseProtos.RegionInfo.parseDelimitedFrom(stream)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 9f23848..1417159 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.security.User; /** * Base class for all the Table procedures that want to use a StateMachineProcedure. - * It provide some basic helpers like basic locking, sync latch, and basic toStringClassDetails(). + * It provides helpers like basic locking, sync latch, and toStringClassDetails(). */ @InterfaceAudience.Private public abstract class AbstractStateMachineTableProcedure<TState> @@ -50,11 +50,15 @@ public abstract class AbstractStateMachineTableProcedure<TState> this(env, null); } + /** + * @param env Uses this to set Procedure Owner at least. + */ protected AbstractStateMachineTableProcedure(final MasterProcedureEnv env, final ProcedurePrepareLatch latch) { - this.user = env.getRequestUser(); - this.setOwner(user); - + if (env != null) { + this.user = env.getRequestUser(); + this.setOwner(user); + } // used for compatibility with clients without procedures // they need a sync TableExistsException, TableNotFoundException, TableNotDisabledException, ... this.syncLatch = latch; @@ -110,4 +114,4 @@ public abstract class AbstractStateMachineTableProcedure<TState> throw new TableNotFoundException(getTableName()); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java index 7bb2887..34c1853 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @@ -100,7 +99,10 @@ public class AddColumnFamilyProcedure setNextState(AddColumnFamilyState.ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS); break; case ADD_COLUMN_FAMILY_REOPEN_ALL_REGIONS: - reOpenAllRegionsIfTableIsOnline(env); + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure(env.getAssignmentManager() + .createReopenProcedures(getRegionInfoList(env))); + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException(this + " unhandled state=" + state); @@ -285,7 +287,8 @@ public class AddColumnFamilyProcedure env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - reOpenAllRegionsIfTableIsOnline(env); + //reOpenAllRegionsIfTableIsOnline(env); + // TODO: NUKE ROLLBACK!!!! } } @@ -302,25 +305,6 @@ public class AddColumnFamilyProcedure } /** - * Last action from the procedure - executed when online schema change is supported. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { - // This operation only run when the table is enabled. - if (!env.getMasterServices().getTableStateManager() - .isTableState(getTableName(), TableState.State.ENABLED)) { - return; - } - - if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) { - LOG.info("Completed add column family operation on table " + getTableName()); - } else { - LOG.warn("Error on reopening the regions on table " + getTableName()); - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled @@ -362,7 +346,8 @@ public class AddColumnFamilyProcedure private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException { if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + regionInfoList = env.getAssignmentManager().getRegionStates() + .getRegionsOfTable(getTableName()); } return regionInfoList; } http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java index 683d840..c1d0326 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CloneSnapshotProcedure.java @@ -149,10 +149,12 @@ public class CloneSnapshotProcedure setNextState(CloneSnapshotState.CLONE_SNAPSHOT_ASSIGN_REGIONS); break; case CLONE_SNAPSHOT_ASSIGN_REGIONS: - CreateTableProcedure.assignRegions(env, getTableName(), newRegions); + CreateTableProcedure.setEnablingState(env, getTableName()); + addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); setNextState(CloneSnapshotState.CLONE_SNAPSHOT_UPDATE_DESC_CACHE); break; case CLONE_SNAPSHOT_UPDATE_DESC_CACHE: + CreateTableProcedure.setEnabledState(env, getTableName()); CreateTableProcedure.updateTableDescCache(env, getTableName()); setNextState(CloneSnapshotState.CLONE_SNAPHOST_RESTORE_ACL); break; http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index ced7abc..c3900dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -107,10 +106,12 @@ public class CreateTableProcedure setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS); break; case CREATE_TABLE_ASSIGN_REGIONS: - assignRegions(env, getTableName(), newRegions); + setEnablingState(env, getTableName()); + addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); break; case CREATE_TABLE_UPDATE_DESC_CACHE: + setEnabledState(env, getTableName()); updateTableDescCache(env, getTableName()); setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION); break; @@ -333,21 +334,21 @@ public class CreateTableProcedure protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env, final HTableDescriptor hTableDescriptor, final List<HRegionInfo> regions) throws IOException { - if (regions != null && regions.size() > 0) { - ProcedureSyncWait.waitMetaRegions(env); + assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions; - // Add regions to META - addRegionsToMeta(env, hTableDescriptor, regions); - // Add replicas if needed - List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions); + ProcedureSyncWait.waitMetaRegions(env); - // Setup replication for region replicas if needed - if (hTableDescriptor.getRegionReplication() > 1) { - ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); - } - return newRegions; + // Add replicas if needed + List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions); + + // Add regions to META + addRegionsToMeta(env, hTableDescriptor, newRegions); + + // Setup replication for region replicas if needed + if (hTableDescriptor.getRegionReplication() > 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); } - return regions; + return newRegions; } /** @@ -374,18 +375,16 @@ public class CreateTableProcedure return hRegionInfos; } - protected static void assignRegions(final MasterProcedureEnv env, - final TableName tableName, final List<HRegionInfo> regions) throws IOException { - ProcedureSyncWait.waitRegionServers(env); + protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName) + throws IOException { // Mark the table as Enabling env.getMasterServices().getTableStateManager() .setTableState(tableName, TableState.State.ENABLING); + } - // Trigger immediate assignment of the regions in round-robin fashion - final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager(); - ModifyRegionUtils.assignRegions(assignmentManager, regions); - + protected static void setEnabledState(final MasterProcedureEnv env, final TableName tableName) + throws IOException { // Enable table env.getMasterServices().getTableStateManager() .setTableState(tableName, TableState.State.ENABLED); http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index 096172a..78bd715 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -106,7 +105,10 @@ public class DeleteColumnFamilyProcedure setNextState(DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS); break; case DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS: - reOpenAllRegionsIfTableIsOnline(env); + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure(env.getAssignmentManager() + .createReopenProcedures(getRegionInfoList(env))); + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException(this + " unhandled state=" + state); @@ -292,7 +294,8 @@ public class DeleteColumnFamilyProcedure env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - reOpenAllRegionsIfTableIsOnline(env); + //reOpenAllRegionsIfTableIsOnline(env); + // TODO: NUKE ROLLBACK!!!! } /** @@ -316,25 +319,6 @@ public class DeleteColumnFamilyProcedure } /** - * Last action from the procedure - executed when online schema change is supported. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { - // This operation only run when the table is enabled. - if (!env.getMasterServices().getTableStateManager() - .isTableState(getTableName(), TableState.State.ENABLED)) { - return; - } - - if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) { - LOG.info("Completed delete column family operation on table " + getTableName()); - } else { - LOG.warn("Error on reopening the regions on table " + getTableName()); - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled @@ -376,7 +360,8 @@ public class DeleteColumnFamilyProcedure private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException { if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + regionInfoList = env.getAssignmentManager().getRegionStates() + .getRegionsOfTable(getTableName()); } return regionInfoList; } http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index bda68eb..04dfc60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.favored.FavoredNodesManager; -import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.mob.MobConstants; @@ -97,8 +96,8 @@ public class DeleteTableProcedure } // TODO: Move out... in the acquireLock() - LOG.debug("waiting for '" + getTableName() + "' regions in transition"); - regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + LOG.debug("Waiting for '" + getTableName() + "' regions in transition"); + regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName()); assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; ProcedureSyncWait.waitRegionInTransition(env, regions); @@ -350,8 +349,7 @@ public class DeleteTableProcedure final TableName tableName) throws IOException { Connection connection = env.getMasterServices().getConnection(); Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName); - try (Table metaTable = - connection.getTable(TableName.META_TABLE_NAME)) { + try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { List<Delete> deletes = new ArrayList<>(); try (ResultScanner resScanner = metaTable.getScanner(tableScan)) { for (Result result : resScanner) { @@ -385,11 +383,9 @@ public class DeleteTableProcedure protected static void deleteAssignmentState(final MasterProcedureEnv env, final TableName tableName) throws IOException { - final AssignmentManager am = env.getMasterServices().getAssignmentManager(); - // Clean up regions of the table in RegionStates. LOG.debug("Removing '" + tableName + "' from region states."); - am.getRegionStates().tableDeleted(tableName); + env.getMasterServices().getAssignmentManager().deleteTable(tableName); // If entry for this table states, remove it. LOG.debug("Marking '" + tableName + "' as deleted."); http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index b53ce45..409ca26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -21,12 +21,9 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; -import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -34,17 +31,11 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.htrace.Trace; @InterfaceAudience.Private public class DisableTableProcedure @@ -116,12 +107,8 @@ public class DisableTableProcedure setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE); break; case DISABLE_TABLE_MARK_REGIONS_OFFLINE: - if (markRegionsOffline(env, tableName, true) == - MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { - setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); - } else { - LOG.trace("Retrying later to disable the missing regions"); - } + addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName)); + setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); break; case DISABLE_TABLE_SET_DISABLED_TABLE_STATE: setTableStateToDisabled(env, tableName); @@ -249,7 +236,7 @@ public class DisableTableProcedure // set the state later on). A quick state check should be enough for us to move forward. TableStateManager tsm = env.getMasterServices().getTableStateManager(); TableState.State state = tsm.getTableState(tableName); - if(!state.equals(TableState.State.ENABLED)){ + if (!state.equals(TableState.State.ENABLED)){ LOG.info("Table " + tableName + " isn't enabled;is "+state.name()+"; skipping disable"); setFailure("master-disable-table", new TableNotEnabledException( tableName+" state is "+state.name())); @@ -290,83 +277,6 @@ public class DisableTableProcedure } /** - * Mark regions of the table offline with retries - * @param env MasterProcedureEnv - * @param tableName the target table - * @param retryRequired whether to retry if the first run failed - * @return whether the operation is fully completed or being interrupted. - * @throws IOException - */ - protected static MarkRegionOfflineOpResult markRegionsOffline( - final MasterProcedureEnv env, - final TableName tableName, - final Boolean retryRequired) throws IOException { - // Dev consideration: add a config to control max number of retry. For now, it is hard coded. - int maxTry = (retryRequired ? 10 : 1); - MarkRegionOfflineOpResult operationResult = - MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED; - do { - try { - operationResult = markRegionsOffline(env, tableName); - if (operationResult == MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { - break; - } - maxTry--; - } catch (Exception e) { - LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e); - maxTry--; - if (maxTry > 0) { - continue; // we still have some retry left, try again. - } - throw e; - } - } while (maxTry > 0); - - if (operationResult != MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { - LOG.warn("Some or all regions of the Table '" + tableName + "' were still online"); - } - - return operationResult; - } - - /** - * Mark regions of the table offline - * @param env MasterProcedureEnv - * @param tableName the target table - * @return whether the operation is fully completed or being interrupted. - * @throws IOException - */ - private static MarkRegionOfflineOpResult markRegionsOffline( - final MasterProcedureEnv env, - final TableName tableName) throws IOException { - // Get list of online regions that are of this table. Regions that are - // already closed will not be included in this list; i.e. the returned - // list is not ALL regions in a table, its all online regions according - // to the in-memory state on this master. - MarkRegionOfflineOpResult operationResult = - MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL; - final List<HRegionInfo> regions = - env.getMasterServices().getAssignmentManager().getRegionStates() - .getRegionsOfTable(tableName); - if (regions.size() > 0) { - LOG.info("Offlining " + regions.size() + " regions."); - - BulkDisabler bd = new BulkDisabler(env, tableName, regions); - try { - if (!bd.bulkAssign()) { - operationResult = MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED; - } - } catch (InterruptedException e) { - LOG.warn("Disable was interrupted"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - operationResult = MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_INTERRUPTED; - } - } - return operationResult; - } - - /** * Mark table state to Disabled * @param env MasterProcedureEnv * @throws IOException @@ -428,64 +338,4 @@ public class DisableTableProcedure } } } - - /** - * Run bulk disable. - */ - private static class BulkDisabler extends BulkAssigner { - private final AssignmentManager assignmentManager; - private final List<HRegionInfo> regions; - private final TableName tableName; - private final int waitingTimeForEvents; - - public BulkDisabler(final MasterProcedureEnv env, final TableName tableName, - final List<HRegionInfo> regions) { - super(env.getMasterServices()); - this.assignmentManager = env.getMasterServices().getAssignmentManager(); - this.tableName = tableName; - this.regions = regions; - this.waitingTimeForEvents = - env.getMasterServices().getConfiguration() - .getInt("hbase.master.event.waiting.time", 1000); - } - - @Override - protected void populatePool(ExecutorService pool) { - RegionStates regionStates = assignmentManager.getRegionStates(); - for (final HRegionInfo region : regions) { - if (regionStates.isRegionInTransition(region) - && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) { - continue; - } - pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler", new Runnable() { - @Override - public void run() { - assignmentManager.unassign(region); - } - })); - } - } - - @Override - protected boolean waitUntilDone(long timeout) throws InterruptedException { - long startTime = EnvironmentEdgeManager.currentTime(); - long remaining = timeout; - List<HRegionInfo> regions = null; - long lastLogTime = startTime; - while (!server.isStopped() && remaining > 0) { - Thread.sleep(waitingTimeForEvents); - regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName); - long now = EnvironmentEdgeManager.currentTime(); - // Don't log more than once every ten seconds. Its obnoxious. And only log table regions - // if we are waiting a while for them to go down... - if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) { - lastLogTime = now; - LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions); - } - if (regions.isEmpty()) break; - remaining = timeout - (now - startTime); - } - return regions != null && regions.isEmpty(); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java new file mode 100644 index 0000000..15ed429 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java @@ -0,0 +1,584 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.MergeRegionException; +import org.apache.hadoop.hbase.exceptions.RegionOpeningException; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.CatalogJanitor; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * The procedure to Merge a region in a table. + */ [email protected] +public class DispatchMergingRegionsProcedure + extends AbstractStateMachineTableProcedure<DispatchMergingRegionsState> { + private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class); + + private final AtomicBoolean aborted = new AtomicBoolean(false); + private Boolean traceEnabled; + private AssignmentManager assignmentManager; + private int timeout; + private ServerName regionLocation; + private String regionsToMergeListFullName; + private String regionsToMergeListEncodedName; + + private TableName tableName; + private HRegionInfo [] regionsToMerge; + private boolean forcible; + + public DispatchMergingRegionsProcedure() { + this.traceEnabled = isTraceEnabled(); + this.assignmentManager = null; + this.timeout = -1; + this.regionLocation = null; + this.regionsToMergeListFullName = null; + this.regionsToMergeListEncodedName = null; + } + + public DispatchMergingRegionsProcedure( + final MasterProcedureEnv env, + final TableName tableName, + final HRegionInfo [] regionsToMerge, + final boolean forcible) { + super(env); + this.traceEnabled = isTraceEnabled(); + this.assignmentManager = getAssignmentManager(env); + this.tableName = tableName; + // For now, we only merge 2 regions. It could be extended to more than 2 regions in + // the future. + assert(regionsToMerge.length == 2); + this.regionsToMerge = regionsToMerge; + this.forcible = forcible; + + this.timeout = -1; + this.regionsToMergeListFullName = getRegionsToMergeListFullNameString(); + this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString(); + } + + @Override + protected Flow executeFromState( + final MasterProcedureEnv env, + final DispatchMergingRegionsState state) throws InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + + try { + switch (state) { + case DISPATCH_MERGING_REGIONS_PREPARE: + prepareMergeRegion(env); + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PRE_OPERATION); + break; + case DISPATCH_MERGING_REGIONS_PRE_OPERATION: + //Unused for now - reserve to add preMerge coprocessor in the future + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS); + break; + case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS: + if (MoveRegionsToSameRS(env)) { + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS); + } else { + LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString() + + ", because can't move them to the same RS"); + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION); + } + break; + case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: + doMergeInRS(env); + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION); + break; + case DISPATCH_MERGING_REGIONS_POST_OPERATION: + //Unused for now - reserve to add postCompletedMerge coprocessor in the future + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() + + " in the table " + tableName + " (in state=" + state + ")", e); + + setFailure("master-merge-regions", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState( + final MasterProcedureEnv env, + final DispatchMergingRegionsState state) throws IOException, InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case DISPATCH_MERGING_REGIONS_POST_OPERATION: + case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: + String msg = this + " We are in the " + state + " state." + + " It is complicated to rollback the merge operation that region server is working on." + + " Rollback is not supported and we should let the merge operation to complete"; + LOG.warn(msg); + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS: + break; // nothing to rollback + case DISPATCH_MERGING_REGIONS_PRE_OPERATION: + break; // nothing to rollback + case DISPATCH_MERGING_REGIONS_PREPARE: + break; // nothing to rollback + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (Exception e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("Failed rollback attempt step " + state + " for merging the regions " + + getRegionsToMergeListFullNameString() + " in table " + tableName, e); + throw e; + } + } + + @Override + protected DispatchMergingRegionsState getState(final int stateId) { + return DispatchMergingRegionsState.valueOf(stateId); + } + + @Override + protected int getStateId(final DispatchMergingRegionsState state) { + return state.getNumber(); + } + + @Override + protected DispatchMergingRegionsState getInitialState() { + return DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PREPARE; + } + + /* + * Check whether we are in the state that can be rollback + */ + @Override + protected boolean isRollbackSupported(final DispatchMergingRegionsState state) { + switch (state) { + case DISPATCH_MERGING_REGIONS_POST_OPERATION: + case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + return true; + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg = + MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) + .setTableName(ProtobufUtil.toProtoTableName(tableName)) + .setForcible(forcible); + for (HRegionInfo hri: regionsToMerge) { + dispatchMergingRegionsMsg.addRegionInfo(HRegionInfo.convert(hri)); + } + dispatchMergingRegionsMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg = + MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream); + setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo())); + tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName()); + + assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2); + regionsToMerge = new HRegionInfo[dispatchMergingRegionsMsg.getRegionInfoCount()]; + for (int i = 0; i < regionsToMerge.length; i++) { + regionsToMerge[i] = HRegionInfo.convert(dispatchMergingRegionsMsg.getRegionInfo(i)); + } + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(tableName); + sb.append(" regions="); + sb.append(getRegionsToMergeListFullNameString()); + sb.append(" forcible="); + sb.append(forcible); + sb.append(")"); + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (!getTableName().isSystemTable() && env.waitInitialized(this)) { + return LockState.LOCK_EVENT_WAIT; + } + if (env.getProcedureScheduler().waitRegions(this, getTableName(), regionsToMerge)) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]); + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_MERGE; + } + + /** + * Prepare merge and do some check + * @param env MasterProcedureEnv + * @throws IOException + */ + private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor(); + boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]); + if (regionAHasMergeQualifier + || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) { + String msg = "Skip merging regions " + regionsToMerge[0].getRegionNameAsString() + + ", " + regionsToMerge[1].getRegionNameAsString() + ", because region " + + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1] + .getEncodedName()) + " has merge qualifier"; + LOG.info(msg); + throw new MergeRegionException(msg); + } + + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName()); + RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName()); + if (regionStateA == null || regionStateB == null) { + throw new UnknownRegionException( + regionStateA == null ? + regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName()); + } + + if (!regionStateA.isOpened() || !regionStateB.isOpened()) { + throw new MergeRegionException( + "Unable to merge regions not online " + regionStateA + ", " + regionStateB); + } + + if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || + regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + throw new MergeRegionException("Can't merge non-default replicas"); + } + + if (!forcible && !HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) { + throw new MergeRegionException( + "Unable to merge not adjacent regions " + + regionsToMerge[0].getRegionNameAsString() + ", " + + regionsToMerge[1].getRegionNameAsString() + + " where forcible = " + forcible); + } + } + + /** + * Move all regions to the same region server + * @param env MasterProcedureEnv + * @return whether target regions hosted by the same RS + * @throws IOException + */ + private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException { + // Make sure regions are on the same regionserver before send merge + // regions request to region server. + // + boolean onSameRS = isRegionsOnTheSameServer(env); + if (!onSameRS) { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); + + RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]); + RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]); + if (loadOfRegionA != null && loadOfRegionB != null + && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) { + // switch regionsToMerge[0] and regionsToMerge[1] + HRegionInfo tmpRegion = this.regionsToMerge[0]; + this.regionsToMerge[0] = this.regionsToMerge[1]; + this.regionsToMerge[1] = tmpRegion; + ServerName tmpLocation = regionLocation; + regionLocation = regionLocation2; + regionLocation2 = tmpLocation; + } + + long startTime = EnvironmentEdgeManager.currentTime(); + + RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation); + LOG.info("Moving regions to same server for merge: " + regionPlan.toString()); + getAssignmentManager(env).moveAsync(regionPlan); + do { + try { + Thread.sleep(20); + // Make sure check RIT first, then get region location, otherwise + // we would make a wrong result if region is online between getting + // region location and checking RIT + boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]); + regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); + onSameRS = regionLocation.equals(regionLocation2); + if (onSameRS || !isRIT) { + // Regions are on the same RS, or regionsToMerge[1] is not in + // RegionInTransition any more + break; + } + } catch (InterruptedException e) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(e); + throw iioe; + } + } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); + } + return onSameRS; + } + + /** + * Do the real merge operation in the region server that hosts regions + * @param env MasterProcedureEnv + * @throws IOException + */ + private void doMergeInRS(final MasterProcedureEnv env) throws IOException { + long duration = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + do { + try { + if (getServerName(env) == null) { + // The merge probably already happen. Check + RegionState regionState = getAssignmentManager(env).getRegionStates().getRegionState( + regionsToMerge[0].getEncodedName()); + if (regionState.isMerging() || regionState.isMerged()) { + LOG.info("Merge regions " + getRegionsToMergeListEncodedNameString() + + " is in progress or completed. No need to send a new request."); + } else { + LOG.warn("Cannot sending merge to hosting server of the regions " + + getRegionsToMergeListEncodedNameString() + " as the server is unknown"); + } + return; + } + // TODO: the following RPC call is not idempotent. Multiple calls (eg. after master + // failover, re-execute this step) could result in some exception thrown that does not + // paint the correct picture. This behavior is on-par with old releases. Improvement + // could happen in the future. + env.getMasterServices().getServerManager().sendRegionsMerge( + getServerName(env), + regionsToMerge[0], + regionsToMerge[1], + forcible, + getUser()); + LOG.info("Sent merge to server " + getServerName(env) + " for region " + + getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible); + return; + } catch (RegionOpeningException roe) { + // Do a retry since region should be online on RS immediately + LOG.warn("Failed mergering regions in " + getServerName(env) + ", retrying...", roe); + } catch (Exception ie) { + LOG.warn("Failed sending merge to " + getServerName(env) + " for regions " + + getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible, ie); + return; + } + } while ((duration = EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); + + // If we reaches here, it means that we get timed out. + String msg = "Failed sending merge to " + getServerName(env) + " after " + duration + "ms"; + LOG.warn(msg); + throw new IOException(msg); + } + + private RegionLoad getRegionLoad( + final MasterProcedureEnv env, + final ServerName sn, + final HRegionInfo hri) { + ServerManager serverManager = env.getMasterServices().getServerManager(); + ServerLoad load = serverManager.getLoad(sn); + if (load != null) { + Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad(); + if (regionsLoad != null) { + return regionsLoad.get(hri.getRegionName()); + } + } + return null; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return whether target regions hosted by the same RS + */ + private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{ + Boolean onSameRS = true; + int i = 0; + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]); + if (regionLocation != null) { + for(i = 1; i < regionsToMerge.length; i++) { + ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]); + if (regionLocation2 != null) { + if (onSameRS) { + onSameRS = regionLocation.equals(regionLocation2); + } + } else { + // At least one region is not online, merge will fail, no need to continue. + break; + } + } + if (i == regionsToMerge.length) { + // Finish checking all regions, return the result; + return onSameRS; + } + } + + // If reaching here, at least one region is not online. + String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() + + ", because region " + regionsToMerge[i].getEncodedName() + " is not online now."; + LOG.warn(msg); + throw new IOException(msg); + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return assignmentManager + */ + private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) { + if (assignmentManager == null) { + assignmentManager = env.getMasterServices().getAssignmentManager(); + } + return assignmentManager; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return timeout value + */ + private int getTimeout(final MasterProcedureEnv env) { + if (timeout == -1) { + timeout = env.getMasterConfiguration().getInt( + "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000); + } + return timeout; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return serverName + */ + private ServerName getServerName(final MasterProcedureEnv env) { + if (regionLocation == null) { + regionLocation = + getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]); + } + return regionLocation; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param fullName whether return only encoded name + * @return region names in a list + */ + private String getRegionsToMergeListFullNameString() { + if (regionsToMergeListFullName == null) { + StringBuilder sb = new StringBuilder("["); + int i = 0; + while(i < regionsToMerge.length - 1) { + sb.append(regionsToMerge[i].getRegionNameAsString() + ", "); + i++; + } + sb.append(regionsToMerge[i].getRegionNameAsString() + " ]"); + regionsToMergeListFullName = sb.toString(); + } + return regionsToMergeListFullName; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return encoded region names + */ + private String getRegionsToMergeListEncodedNameString() { + if (regionsToMergeListEncodedName == null) { + StringBuilder sb = new StringBuilder("["); + int i = 0; + while(i < regionsToMerge.length - 1) { + sb.append(regionsToMerge[i].getEncodedName() + ", "); + i++; + } + sb.append(regionsToMerge[i].getEncodedName() + " ]"); + regionsToMergeListEncodedName = sb.toString(); + } + return regionsToMergeListEncodedName; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled + */ + private Boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } +}
