http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 71c6b89..2703947 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -19,40 +19,55 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterWalManager; -import org.apache.hadoop.hbase.master.assignment.AssignProcedure; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure; -import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.KeeperException; /** * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called * ServerShutdownHandler. * - * <p>The procedure flow varies dependent on whether meta is assigned and if we are to split logs. + * <p>The procedure flow varies dependent on whether meta is assigned, if we are + * doing distributed log replay versus distributed log splitting, and if we are to split logs at + * all. + * + * <p>This procedure asks that all crashed servers get processed equally; we yield after the + * completion of each successful flow step. We do this so that we do not 'deadlock' waiting on + * a region assignment so we can replay edits which could happen if a region moved there are edits + * on two servers for replay. * - * <p>We come in here after ServerManager has noticed a server has expired. Procedures - * queued on the rpc should have been notified about fail and should be concurrently - * getting themselves ready to assign elsewhere. + * <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit when assign is pv2. + * TODO: We do not have special handling for system tables. */ public class ServerCrashProcedure extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState> @@ -60,6 +75,36 @@ implements ServerProcedureInterface { private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class); /** + * Configuration key to set how long to wait in ms doing a quick check on meta state. + */ + public static final String KEY_SHORT_WAIT_ON_META = + "hbase.master.servercrash.short.wait.on.meta.ms"; + + public static final int DEFAULT_SHORT_WAIT_ON_META = 1000; + + /** + * Configuration key to set how many retries to cycle before we give up on meta. + * Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} milliseconds. + */ + public static final String KEY_RETRIES_ON_META = + "hbase.master.servercrash.meta.retries"; + + public static final int DEFAULT_RETRIES_ON_META = 10; + + /** + * Configuration key to set how long to wait in ms on regions in transition. + */ + public static final String KEY_WAIT_ON_RIT = + "hbase.master.servercrash.wait.on.rit.ms"; + + public static final int DEFAULT_WAIT_ON_RIT = 30000; + + private static final Set<HRegionInfo> META_REGION_SET = new HashSet<>(); + static { + META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO); + } + + /** * Name of the crashed server to process. */ private ServerName serverName; @@ -72,8 +117,14 @@ implements ServerProcedureInterface { /** * Regions that were on the crashed server. */ - private List<HRegionInfo> regionsOnCrashedServer; + private Set<HRegionInfo> regionsOnCrashedServer; + /** + * Regions assigned. Usually some subset of {@link #regionsOnCrashedServer}. + */ + private List<HRegionInfo> regionsAssigned; + + private boolean distributedLogReplay = false; private boolean carryingMeta = false; private boolean shouldSplitWal; @@ -113,11 +164,20 @@ implements ServerProcedureInterface { super(); } + private void throwProcedureYieldException(final String msg) throws ProcedureYieldException { + String logMsg = msg + "; cycle=" + this.cycles + ", running for " + + StringUtils.formatTimeDiff(System.currentTimeMillis(), getSubmittedTime()); + // The procedure executor logs ProcedureYieldException at trace level. For now, log these + // yields for server crash processing at DEBUG. Revisit when stable. + if (LOG.isDebugEnabled()) LOG.debug(logMsg); + throw new ProcedureYieldException(logMsg); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) - throws ProcedureSuspendedException, ProcedureYieldException { + throws ProcedureYieldException { if (LOG.isTraceEnabled()) { - LOG.trace(state + " " + this + "; cycles=" + this.cycles); + LOG.trace(state); } // Keep running count of cycles if (state.ordinal() != this.previousState) { @@ -126,7 +186,11 @@ implements ServerProcedureInterface { } else { this.cycles++; } - final MasterServices services = env.getMasterServices(); + MasterServices services = env.getMasterServices(); + // Is master fully online? If not, yield. No processing of servers unless master is up + if (!services.getAssignmentManager().isFailoverCleanupDone()) { + throwProcedureYieldException("Waiting on master failover to complete"); + } // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -137,61 +201,102 @@ implements ServerProcedureInterface { try { switch (state) { case SERVER_CRASH_START: - LOG.info("Start " + this); + LOG.info("Start processing crashed " + this.serverName); start(env); // If carrying meta, process it first. Else, get list of regions on crashed server. - if (this.carryingMeta) { - setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META); - } else { - setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); - } + if (this.carryingMeta) setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META); + else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); break; case SERVER_CRASH_GET_REGIONS: // If hbase:meta is not assigned, yield. - if (env.getAssignmentManager().waitMetaInitialized(this)) { - throw new ProcedureSuspendedException(); + if (!isMetaAssignedQuickTest(env)) { + // isMetaAssignedQuickTest does not really wait. Let's delay a little before + // another round of execution. + long wait = + env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, + DEFAULT_SHORT_WAIT_ON_META); + wait = wait / 10; + Thread.sleep(wait); + throwProcedureYieldException("Waiting on hbase:meta assignment"); } - - this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates() - .getServerRegionInfoSet(serverName); - // Where to go next? Depends on whether we should split logs at all or - // if we should do distributed log splitting. + this.regionsOnCrashedServer = + services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName); + // Where to go next? Depends on whether we should split logs at all or if we should do + // distributed log splitting (DLS) vs distributed log replay (DLR). if (!this.shouldSplitWal) { setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else if (this.distributedLogReplay) { + setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY); } else { setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); } break; case SERVER_CRASH_PROCESS_META: - processMeta(env); + // If we fail processing hbase:meta, yield. + if (!processMeta(env)) { + throwProcedureYieldException("Waiting on regions-in-transition to clear"); + } setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); break; + case SERVER_CRASH_PREPARE_LOG_REPLAY: + prepareLogReplay(env, this.regionsOnCrashedServer); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + break; + case SERVER_CRASH_SPLIT_LOGS: splitLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + // If DLR, go to FINISH. Otherwise, if DLS, go to SERVER_CRASH_CALC_REGIONS_TO_ASSIGN + if (this.distributedLogReplay) setNextState(ServerCrashState.SERVER_CRASH_FINISH); + else setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); break; case SERVER_CRASH_ASSIGN: + List<HRegionInfo> regionsToAssign = calcRegionsToAssign(env); + + // Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning + // which is what we are mimicing here but it looks prone to double assignment if assign + // fails midway. TODO: Test. + // If no regions to assign, skip assign and skip to the finish. - // Filter out meta regions. Those are handled elsewhere in this procedure. - // Filter changes this.regionsOnCrashedServer. - if (filterDefaultMetaRegions(regionsOnCrashedServer)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Assigning regions " + - HRegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this + - "; cycles=" + this.cycles); + boolean regions = regionsToAssign != null && !regionsToAssign.isEmpty(); + if (regions) { + this.regionsAssigned = regionsToAssign; + if (!assign(env, regionsToAssign)) { + throwProcedureYieldException("Failed assign; will retry"); } - handleRIT(env, regionsOnCrashedServer); - addChildProcedure(env.getAssignmentManager(). - createAssignProcedures(regionsOnCrashedServer, true)); } - setNextState(ServerCrashState.SERVER_CRASH_FINISH); + if (this.shouldSplitWal && distributedLogReplay) { + // Take this route even if there are apparently no regions assigned. This may be our + // second time through here; i.e. we assigned and crashed just about here. On second + // time through, there will be no regions because we assigned them in the previous step. + // Even though no regions, we need to go through here to clean up the DLR zk markers. + setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN); + } else { + setNextState(ServerCrashState.SERVER_CRASH_FINISH); + } + break; + + case SERVER_CRASH_WAIT_ON_ASSIGN: + // TODO: The list of regionsAssigned may be more than we actually assigned. See down in + // AM #1629 around 'if (regionStates.wasRegionOnDeadServer(encodedName)) {' where where we + // will skip assigning a region because it is/was on a dead server. Should never happen! + // It was on this server. Worst comes to worst, we'll still wait here till other server is + // processed. + + // If the wait on assign failed, yield -- if we have regions to assign. + if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) { + if (!waitOnAssign(env, this.regionsAssigned)) { + throwProcedureYieldException("Waiting on region assign"); + } + } + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); break; case SERVER_CRASH_FINISH: + LOG.info("Finished processing of crashed " + serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; @@ -199,7 +304,11 @@ implements ServerProcedureInterface { throw new UnsupportedOperationException("unhandled state=" + state); } } catch (IOException e) { - LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + this.cycles, e); + LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + "; retry", e); + } catch (InterruptedException e) { + // TODO: Make executor allow IEs coming up out of execute. + LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + state + "; retry", e); + Thread.currentThread().interrupt(); } return Flow.HAS_MORE_STATE; } @@ -209,60 +318,96 @@ implements ServerProcedureInterface { * @param env * @throws IOException */ - private void start(final MasterProcedureEnv env) throws IOException {} + private void start(final MasterProcedureEnv env) throws IOException { + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + // Set recovery mode late. This is what the old ServerShutdownHandler used do. + mwm.setLogRecoveryMode(); + this.distributedLogReplay = mwm.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY; + } /** * @param env + * @return False if we fail to assign and split logs on meta ('process'). * @throws IOException * @throws InterruptedException */ - private void processMeta(final MasterProcedureEnv env) throws IOException { + private boolean processMeta(final MasterProcedureEnv env) + throws IOException { if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName); - + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO; if (this.shouldSplitWal) { - // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment. - env.getMasterServices().getMasterWalManager().splitMetaLog(serverName); + if (this.distributedLogReplay) { + prepareLogReplay(env, META_REGION_SET); + } else { + // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment. + mwm.splitMetaLog(serverName); + am.getRegionStates().logSplit(metaHRI); + } } // Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout - final AssignmentManager am = env.getMasterServices().getAssignmentManager(); - for (HRegionInfo hri: am.getRegionStates().getServerRegionInfoSet(serverName)) { - if (!isDefaultMetaRegion(hri)) continue; - - am.offlineRegion(hri); - addChildProcedure(am.createAssignProcedure(hri, true)); + boolean processed = true; + if (am.isCarryingMeta(serverName)) { + // TODO: May block here if hard time figuring state of meta. + am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO); + verifyAndAssignMetaWithRetries(env); + if (this.shouldSplitWal && distributedLogReplay) { + int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT); + if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) { + processed = false; + } else { + // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment. + mwm.splitMetaLog(serverName); + } + } } + return processed; } - private boolean filterDefaultMetaRegions(final List<HRegionInfo> regions) { - if (regions == null) return false; - final Iterator<HRegionInfo> it = regions.iterator(); - while (it.hasNext()) { - final HRegionInfo hri = it.next(); - if (isDefaultMetaRegion(hri)) { - it.remove(); + /** + * @return True if region cleared RIT, else false if we timed out waiting. + * @throws InterruptedIOException + */ + private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am, + final HRegionInfo hri, final int timeout) + throws InterruptedIOException { + try { + if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) { + // Wait here is to avoid log replay hits current dead server and incur a RPC timeout + // when replay happens before region assignment completes. + LOG.warn("Region " + hri.getEncodedName() + " didn't complete assignment in time"); + return false; } + } catch (InterruptedException ie) { + throw new InterruptedIOException("Caught " + ie + + " during waitOnRegionToClearRegionsInTransition for " + hri); } - return !regions.isEmpty(); + return true; } - private boolean isDefaultMetaRegion(final HRegionInfo hri) { - return hri.getTable().equals(TableName.META_TABLE_NAME) && - RegionReplicaUtil.isDefaultReplica(hri); + private void prepareLogReplay(final MasterProcedureEnv env, final Set<HRegionInfo> regions) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " regions-in-recovery from " + + this.serverName); + } + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + mwm.prepareLogReplay(this.serverName, regions); + am.getRegionStates().logSplit(this.serverName); } private void splitLogs(final MasterProcedureEnv env) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Splitting WALs " + this); + LOG.debug("Splitting logs from " + serverName + "; region count=" + + size(this.regionsOnCrashedServer)); } MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); AssignmentManager am = env.getMasterServices().getAssignmentManager(); // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running. - // PROBLEM!!! WE BLOCK HERE. mwm.splitLog(this.serverName); - if (LOG.isDebugEnabled()) { - LOG.debug("Done splitting WALs " + this); - } am.getRegionStates().logSplit(this.serverName); } @@ -270,6 +415,124 @@ implements ServerProcedureInterface { return hris == null? 0: hris.size(); } + /** + * Figure out what we need to assign. Should be idempotent. + * @param env + * @return List of calculated regions to assign; may be empty or null. + * @throws IOException + */ + private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env) + throws IOException { + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + List<HRegionInfo> regionsToAssignAggregator = new ArrayList<>(); + int replicaCount = env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM, + HConstants.DEFAULT_META_REPLICA_NUM); + for (int i = 1; i < replicaCount; i++) { + HRegionInfo metaHri = + RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i); + if (am.isCarryingMetaReplica(this.serverName, metaHri)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Reassigning meta replica" + metaHri + " that was on " + this.serverName); + } + regionsToAssignAggregator.add(metaHri); + } + } + // Clean out anything in regions in transition. + List<HRegionInfo> regionsInTransition = am.cleanOutCrashedServerReferences(serverName); + if (LOG.isDebugEnabled()) { + LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) + + " region(s) that " + (serverName == null? "null": serverName) + + " was carrying (and " + regionsInTransition.size() + + " regions(s) that were opening on this server)"); + } + regionsToAssignAggregator.addAll(regionsInTransition); + + // Iterate regions that were on this server and figure which of these we need to reassign + if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) { + RegionStates regionStates = am.getRegionStates(); + for (HRegionInfo hri: this.regionsOnCrashedServer) { + if (regionsInTransition.contains(hri)) continue; + String encodedName = hri.getEncodedName(); + Lock lock = am.acquireRegionLock(encodedName); + try { + RegionState rit = regionStates.getRegionTransitionState(hri); + if (processDeadRegion(hri, am)) { + ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri); + if (addressFromAM != null && !addressFromAM.equals(this.serverName)) { + // If this region is in transition on the dead server, it must be + // opening or pending_open, which should have been covered by + // AM#cleanOutCrashedServerReferences + LOG.info("Skip assigning " + hri.getRegionNameAsString() + + " because opened on " + addressFromAM.getServerName()); + continue; + } + if (rit != null) { + if (rit.getServerName() != null && !rit.isOnServer(this.serverName)) { + // Skip regions that are in transition on other server + LOG.info("Skip assigning region in transition on other server" + rit); + continue; + } + LOG.info("Reassigning region " + rit + " and clearing zknode if exists"); + regionStates.updateRegionState(hri, RegionState.State.OFFLINE); + } else if (regionStates.isRegionInState( + hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) { + regionStates.updateRegionState(hri, RegionState.State.OFFLINE); + } + regionsToAssignAggregator.add(hri); + // TODO: The below else if is different in branch-1 from master branch. + } else if (rit != null) { + if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline()) + && am.getTableStateManager().isTableState(hri.getTable(), + TableState.State.DISABLED, TableState.State.DISABLING) || + am.getReplicasToClose().contains(hri)) { + // If the table was partially disabled and the RS went down, we should clear the + // RIT and remove the node for the region. + // The rit that we use may be stale in case the table was in DISABLING state + // but though we did assign we will not be clearing the znode in CLOSING state. + // Doing this will have no harm. See HBASE-5927 + regionStates.updateRegionState(hri, RegionState.State.OFFLINE); + am.offlineDisabledRegion(hri); + } else { + LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition " + + rit + " not to be assigned by SSH of server " + serverName); + } + } + } finally { + lock.unlock(); + } + } + } + return regionsToAssignAggregator; + } + + private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> hris) + throws InterruptedIOException { + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + try { + am.assign(hris); + } catch (InterruptedException ie) { + LOG.error("Caught " + ie + " during round-robin assignment"); + throw (InterruptedIOException)new InterruptedIOException().initCause(ie); + } catch (IOException ioe) { + LOG.info("Caught " + ioe + " during region assignment, will retry"); + return false; + } + return true; + } + + private boolean waitOnAssign(final MasterProcedureEnv env, final List<HRegionInfo> hris) + throws InterruptedIOException { + int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT); + for (HRegionInfo hri: hris) { + // TODO: Blocks here. + if (!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(), + hri, timeout)) { + return false; + } + } + return true; + } + @Override protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) throws IOException { @@ -317,11 +580,11 @@ implements ServerProcedureInterface { @Override public void toStringClassDetails(StringBuilder sb) { sb.append(getClass().getSimpleName()); - sb.append(" server="); - sb.append(serverName); - sb.append(", splitWal="); + sb.append(" serverName="); + sb.append(this.serverName); + sb.append(", shouldSplitWal="); sb.append(shouldSplitWal); - sb.append(", meta="); + sb.append(", carryingMeta="); sb.append(carryingMeta); } @@ -332,6 +595,7 @@ implements ServerProcedureInterface { MasterProcedureProtos.ServerCrashStateData.Builder state = MasterProcedureProtos.ServerCrashStateData.newBuilder(). setServerName(ProtobufUtil.toServerName(this.serverName)). + setDistributedLogReplay(this.distributedLogReplay). setCarryingMeta(this.carryingMeta). setShouldSplitWal(this.shouldSplitWal); if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) { @@ -339,6 +603,11 @@ implements ServerProcedureInterface { state.addRegionsOnCrashedServer(HRegionInfo.convert(hri)); } } + if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) { + for (HRegionInfo hri: this.regionsAssigned) { + state.addRegionsAssigned(HRegionInfo.convert(hri)); + } + } state.build().writeDelimitedTo(stream); } @@ -349,16 +618,142 @@ implements ServerProcedureInterface { MasterProcedureProtos.ServerCrashStateData state = MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream); this.serverName = ProtobufUtil.toServerName(state.getServerName()); + this.distributedLogReplay = state.hasDistributedLogReplay()? + state.getDistributedLogReplay(): false; this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false; // shouldSplitWAL has a default over in pb so this invocation will always work. this.shouldSplitWal = state.getShouldSplitWal(); int size = state.getRegionsOnCrashedServerCount(); if (size > 0) { - this.regionsOnCrashedServer = new ArrayList<HRegionInfo>(size); + this.regionsOnCrashedServer = new HashSet<>(size); for (RegionInfo ri: state.getRegionsOnCrashedServerList()) { this.regionsOnCrashedServer.add(HRegionInfo.convert(ri)); } } + size = state.getRegionsAssignedCount(); + if (size > 0) { + this.regionsAssigned = new ArrayList<>(size); + for (RegionInfo ri: state.getRegionsOnCrashedServerList()) { + this.regionsAssigned.add(HRegionInfo.convert(ri)); + } + } + } + + /** + * Process a dead region from a dead RS. Checks if the region is disabled or + * disabling or if the region has a partially completed split. + * @param hri + * @param assignmentManager + * @return Returns true if specified region should be assigned, false if not. + * @throws IOException + */ + private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager assignmentManager) + throws IOException { + boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable()); + if (!tablePresent) { + LOG.info("The table " + hri.getTable() + " was deleted. Hence not proceeding."); + return false; + } + // If table is not disabled but the region is offlined, + boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(), + TableState.State.DISABLED); + if (disabled){ + LOG.info("The table " + hri.getTable() + " was disabled. Hence not proceeding."); + return false; + } + if (hri.isOffline() && hri.isSplit()) { + // HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation. + // If the meta scanner saw the parent split, then it should see the daughters as assigned + // to the dead server. We don't have to do anything. + return false; + } + boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(), + TableState.State.DISABLING); + if (disabling) { + LOG.info("The table " + hri.getTable() + " is disabled. Hence not assigning region" + + hri.getEncodedName()); + return false; + } + return true; + } + + /** + * If hbase:meta is not assigned already, assign. + * @throws IOException + */ + private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) throws IOException { + MasterServices services = env.getMasterServices(); + int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, DEFAULT_RETRIES_ON_META); + // Just reuse same time as we have for short wait on meta. Adding another config is overkill. + long waitTime = + services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META); + int iFlag = 0; + while (true) { + try { + verifyAndAssignMeta(env); + break; + } catch (KeeperException e) { + services.abort("In server shutdown processing, assigning meta", e); + throw new IOException("Aborting", e); + } catch (Exception e) { + if (iFlag >= iTimes) { + services.abort("verifyAndAssignMeta failed after" + iTimes + " retries, aborting", e); + throw new IOException("Aborting", e); + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e1) { + LOG.warn("Interrupted when is the thread sleep", e1); + Thread.currentThread().interrupt(); + throw (InterruptedIOException)new InterruptedIOException().initCause(e1); + } + iFlag++; + } + } + } + + /** + * If hbase:meta is not assigned already, assign. + * @throws InterruptedException + * @throws IOException + * @throws KeeperException + */ + private void verifyAndAssignMeta(final MasterProcedureEnv env) + throws InterruptedException, IOException, KeeperException { + MasterServices services = env.getMasterServices(); + if (!isMetaAssignedQuickTest(env)) { + services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO); + } else if (serverName.equals(services.getMetaTableLocator(). + getMetaRegionLocation(services.getZooKeeper()))) { + throw new IOException("hbase:meta is onlined on the dead server " + this.serverName); + } else { + LOG.info("Skip assigning hbase:meta because it is online at " + + services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper())); + } + } + + /** + * A quick test that hbase:meta is assigned; blocks for short time only. + * @return True if hbase:meta location is available and verified as good. + * @throws InterruptedException + * @throws IOException + */ + private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env) + throws InterruptedException, IOException { + ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper(); + MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator(); + boolean metaAssigned = false; + // Is hbase:meta location available yet? + if (mtl.isLocationAvailable(zkw)) { + ClusterConnection connection = env.getMasterServices().getClusterConnection(); + // Is hbase:meta location good yet? + long timeout = + env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META); + if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) { + metaAssigned = true; + } + } + return metaAssigned; } @Override @@ -394,46 +789,4 @@ implements ServerProcedureInterface { // the client does not know about this procedure. return false; } - - /** - * Handle any outstanding RIT that are up against this.serverName, the crashed server. - * Notify them of crash. Remove assign entries from the passed in <code>regions</code> - * otherwise we have two assigns going on and they will fight over who has lock. - * Notify Unassigns also. - * @param crashedServer Server that crashed. - * @param regions Regions that were on crashed server - * @return Subset of <code>regions</code> that were RIT against <code>crashedServer</code> - */ - private void handleRIT(final MasterProcedureEnv env, final List<HRegionInfo> regions) { - if (regions == null) return; - AssignmentManager am = env.getMasterServices().getAssignmentManager(); - final Iterator<HRegionInfo> it = regions.iterator(); - ServerCrashException sce = null; - while (it.hasNext()) { - final HRegionInfo hri = it.next(); - RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri); - if (rtp == null) continue; - // Make sure the RIT is against this crashed server. In the case where there are many - // processings of a crashed server -- backed up for whatever reason (slow WAL split) -- - // then a previous SCP may have already failed an assign, etc., and it may have a new - // location target; DO NOT fail these else we make for assign flux. - ServerName rtpServerName = rtp.getServer(env); - if (rtpServerName == null) { - LOG.warn("RIT with ServerName null! " + rtp); - continue; - } - if (!rtpServerName.equals(this.serverName)) continue; - LOG.info("pid=" + getProcId() + " found RIT " + rtp + "; " + - rtp.getRegionState(env).toShortString()); - // Notify RIT on server crash. - if (sce == null) { - sce = new ServerCrashException(getProcId(), getServerName()); - } - rtp.remoteCallFailed(env, this.serverName, sce); - if (rtp instanceof AssignProcedure) { - // If an assign, include it in our return and remove from passed-in list of regions. - it.remove(); - } - } - } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java new file mode 100644 index 0000000..bf9afd7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java @@ -0,0 +1,785 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; + +import com.google.common.annotations.VisibleForTesting; + +/** + * The procedure to split a region in a table. + */ [email protected] +public class SplitTableRegionProcedure + extends AbstractStateMachineTableProcedure<SplitTableRegionState> { + private static final Log LOG = LogFactory.getLog(SplitTableRegionProcedure.class); + + private Boolean traceEnabled; + + /* + * Region to split + */ + private HRegionInfo parentHRI; + private HRegionInfo daughter_1_HRI; + private HRegionInfo daughter_2_HRI; + + public SplitTableRegionProcedure() { + this.traceEnabled = null; + } + + public SplitTableRegionProcedure(final MasterProcedureEnv env, + final HRegionInfo regionToSplit, final byte[] splitRow) throws IOException { + super(env); + + checkSplitRow(regionToSplit, splitRow); + + this.traceEnabled = null; + this.parentHRI = regionToSplit; + + final TableName table = regionToSplit.getTable(); + final long rid = getDaughterRegionIdTimestamp(regionToSplit); + this.daughter_1_HRI = new HRegionInfo(table, regionToSplit.getStartKey(), splitRow, false, rid); + this.daughter_2_HRI = new HRegionInfo(table, splitRow, regionToSplit.getEndKey(), false, rid); + } + + private static void checkSplitRow(final HRegionInfo regionToSplit, final byte[] splitRow) + throws IOException { + if (splitRow == null || splitRow.length == 0) { + throw new DoNotRetryIOException("Split row cannot be null"); + } + + if (Bytes.equals(regionToSplit.getStartKey(), splitRow)) { + throw new DoNotRetryIOException( + "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow)); + } + + if (!regionToSplit.containsRow(splitRow)) { + throw new DoNotRetryIOException( + "Split row is not inside region key range splitKey:" + Bytes.toStringBinary(splitRow) + + " region: " + regionToSplit); + } + } + + /** + * Calculate daughter regionid to use. + * @param hri Parent {@link HRegionInfo} + * @return Daughter region id (timestamp) to use. + */ + private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { + long rid = EnvironmentEdgeManager.currentTime(); + // Regionid is timestamp. Can't be less than that of parent else will insert + // at wrong location in hbase:meta (See HBASE-710). + if (rid < hri.getRegionId()) { + LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + + " but current time here is " + rid); + rid = hri.getRegionId() + 1; + } + return rid; + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, final SplitTableRegionState state) + throws InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + + try { + switch (state) { + case SPLIT_TABLE_REGION_PREPARE: + if (prepareSplitRegion(env)) { + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION); + break; + } else { + assert isFailed() : "split region should have an exception here"; + return Flow.NO_MORE_STATE; + } + case SPLIT_TABLE_REGION_PRE_OPERATION: + preSplitRegion(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE); + break; + case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE: + setRegionStateToSplitting(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION); + break; + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: + closeParentRegionForSplit(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS); + break; + case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: + createDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR: + preSplitRegionBeforePONR(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); + break; + case SPLIT_TABLE_REGION_UPDATE_META: + // This is the point of no return. Adding subsequent edits to .META. as we + // do below when we do the daughter opens adding each to .META. can fail in + // various interesting ways the most interesting of which is a timeout + // BUT the edits all go through (See HBASE-3872). IF we reach the PONR + // then subsequent failures need to crash out this region server; the + // server shutdown processing should be able to fix-up the incomplete split. + // The offlined parent will have the daughters as extra columns. If + // we leave the daughter regions in place and do not remove them when we + // crash out, then they will have their references to the parent in place + // still and the server shutdown fixup of .META. will point to these + // regions. + // We should add PONR JournalEntry before offlineParentInMeta,so even if + // OfflineParentInMeta timeout,this will cause regionserver exit,and then + // master ServerShutdownHandler will fix daughter & avoid data loss. (See + // HBase-4562). + updateMetaForDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR: + preSplitRegionAfterPONR(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS); + break; + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + openDaughterRegions(env); + setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION); + break; + case SPLIT_TABLE_REGION_POST_OPERATION: + postSplitRegion(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + String msg = "Error trying to split region " + parentHRI.getEncodedName() + " in the table " + + getTableName() + " (in state=" + state + ")"; + if (!isRollbackSupported(state)) { + // We reach a state that cannot be rolled back. We just need to keep retry. + LOG.warn(msg, e); + } else { + LOG.error(msg, e); + setFailure("master-split-region", e); + } + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final SplitTableRegionState state) + throws IOException, InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case SPLIT_TABLE_REGION_POST_OPERATION: + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR: + case SPLIT_TABLE_REGION_UPDATE_META: + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR: + break; + case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS: + // Doing nothing, as re-open parent region would clean up daughter region directories. + break; + case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION: + openParentRegion(env); + break; + case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE: + setRegionStateToRevertSplitting(env); + break; + case SPLIT_TABLE_REGION_PRE_OPERATION: + postRollBackSplitRegion(env); + break; + case SPLIT_TABLE_REGION_PREPARE: + break; // nothing to do + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("Failed rollback attempt step " + state + " for splitting the region " + + parentHRI.getEncodedName() + " in table " + getTableName(), e); + throw e; + } + } + + /* + * Check whether we are in the state that can be rollback + */ + @Override + protected boolean isRollbackSupported(final SplitTableRegionState state) { + switch (state) { + case SPLIT_TABLE_REGION_POST_OPERATION: + case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS: + case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR: + case SPLIT_TABLE_REGION_UPDATE_META: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + return true; + } + + @Override + protected SplitTableRegionState getState(final int stateId) { + return SplitTableRegionState.forNumber(stateId); + } + + @Override + protected int getStateId(final SplitTableRegionState state) { + return state.getNumber(); + } + + @Override + protected SplitTableRegionState getInitialState() { + return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE; + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + final MasterProcedureProtos.SplitTableRegionStateData.Builder splitTableRegionMsg = + MasterProcedureProtos.SplitTableRegionStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) + .setParentRegionInfo(HRegionInfo.convert(parentHRI)) + .addChildRegionInfo(HRegionInfo.convert(daughter_1_HRI)) + .addChildRegionInfo(HRegionInfo.convert(daughter_2_HRI)); + splitTableRegionMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg = + MasterProcedureProtos.SplitTableRegionStateData.parseDelimitedFrom(stream); + setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo())); + parentHRI = HRegionInfo.convert(splitTableRegionsMsg.getParentRegionInfo()); + if (splitTableRegionsMsg.getChildRegionInfoCount() == 0) { + daughter_1_HRI = daughter_2_HRI = null; + } else { + assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2); + daughter_1_HRI = HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfoList().get(0)); + daughter_2_HRI = HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfoList().get(1)); + } + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(getTableName()); + sb.append(" parent region="); + sb.append(parentHRI); + if (daughter_1_HRI != null) { + sb.append(" first daughter region="); + sb.append(daughter_1_HRI); + } + if (daughter_2_HRI != null) { + sb.append(" and second daughter region="); + sb.append(daughter_2_HRI); + } + sb.append(")"); + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) { + return LockState.LOCK_EVENT_WAIT; + } + return env.getProcedureScheduler().waitRegions(this, getTableName(), parentHRI)? + LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeRegions(this, getTableName(), parentHRI); + } + + @Override + public TableName getTableName() { + return parentHRI.getTable(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.SPLIT; + } + + private byte[] getSplitRow() { + return daughter_2_HRI.getStartKey(); + } + + /** + * Prepare to Split region. + * @param env MasterProcedureEnv + * @throws IOException + */ + @VisibleForTesting + public boolean prepareSplitRegion(final MasterProcedureEnv env) throws IOException { + // Check whether the region is splittable + final RegionState state = getParentRegionState(env); + if (state.isClosing() || state.isClosed() || + state.isSplittingOrSplitOnServer(state.getServerName())) { + setFailure( + "master-split-region", + new IOException("Split region " + parentHRI + " failed due to region is not splittable")); + return false; + } + return true; + } + + /** + * Action before splitting region in a table. + * @param env MasterProcedureEnv + * @param state the procedure state + * @throws IOException + * @throws InterruptedException + */ + private void preSplitRegion(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser()); + } + } + + /** + * Action after rollback a split table region action. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void postRollBackSplitRegion(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postRollBackSplitRegionAction(getUser()); + } + } + + /** + * Set the parent region state to SPLITTING state + * @param env MasterProcedureEnv + * @throws IOException + */ + @VisibleForTesting + public void setRegionStateToSplitting(final MasterProcedureEnv env) throws IOException { + RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); + transition.setTransitionCode(TransitionCode.READY_TO_SPLIT); + transition.addRegionInfo(HRegionInfo.convert(parentHRI)); + transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI)); + transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI)); + if (env.getMasterServices().getAssignmentManager().onRegionTransition( + getParentRegionState(env).getServerName(), transition.build()) != null) { + throw new IOException("Failed to update region state to SPLITTING for " + + parentHRI.getRegionNameAsString()); + } + } + + /** + * Rollback the region state change + * @param env MasterProcedureEnv + * @throws IOException + */ + private void setRegionStateToRevertSplitting(final MasterProcedureEnv env) throws IOException { + RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); + transition.setTransitionCode(TransitionCode.SPLIT_REVERTED); + transition.addRegionInfo(HRegionInfo.convert(parentHRI)); + transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI)); + transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI)); + if (env.getMasterServices().getAssignmentManager().onRegionTransition( + getParentRegionState(env).getServerName(), transition.build()) != null) { + throw new IOException("Failed to update region state for " + + parentHRI.getRegionNameAsString() + " as part of operation for reverting split"); + } + } + + /** + * RPC to region server that host the parent region, ask for close the parent regions + * @param env MasterProcedureEnv + * @throws IOException + */ + @VisibleForTesting + public void closeParentRegionForSplit(final MasterProcedureEnv env) throws IOException { + boolean success = env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge( + getParentRegionState(env).getServerName(), parentHRI); + if (!success) { + throw new IOException("Close parent region " + parentHRI + " for splitting failed." + + " Check region server log for more details"); + } + } + + /** + * Rollback close parent region + * @param env MasterProcedureEnv + **/ + private void openParentRegion(final MasterProcedureEnv env) throws IOException { + // Check whether the region is closed; if so, open it in the same server + RegionState state = getParentRegionState(env); + if (state.isClosing() || state.isClosed()) { + env.getMasterServices().getServerManager().sendRegionOpen( + getParentRegionState(env).getServerName(), + parentHRI, + ServerName.EMPTY_SERVER_LIST); + } + } + + /** + * Create daughter regions + * @param env MasterProcedureEnv + * @throws IOException + */ + @VisibleForTesting + public void createDaughterRegions(final MasterProcedureEnv env) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), parentHRI.getTable()); + final FileSystem fs = mfs.getFileSystem(); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, parentHRI, false); + regionFs.createSplitsDir(); + + Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs); + + assertReferenceFileCount( + fs, expectedReferences.getFirst(), regionFs.getSplitsDir(daughter_1_HRI)); + //Move the files from the temporary .splits to the final /table/region directory + regionFs.commitDaughterRegion(daughter_1_HRI); + assertReferenceFileCount( + fs, + expectedReferences.getFirst(), + new Path(tabledir, daughter_1_HRI.getEncodedName())); + + assertReferenceFileCount( + fs, expectedReferences.getSecond(), regionFs.getSplitsDir(daughter_2_HRI)); + regionFs.commitDaughterRegion(daughter_2_HRI); + assertReferenceFileCount( + fs, + expectedReferences.getSecond(), + new Path(tabledir, daughter_2_HRI.getEncodedName())); + } + + /** + * Create Split directory + * @param env MasterProcedureEnv + * @throws IOException + */ + private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env, + final HRegionFileSystem regionFs) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Configuration conf = env.getMasterConfiguration(); + + // The following code sets up a thread pool executor with as many slots as + // there's files to split. It then fires up everything, waits for + // completion and finally checks for any exception + // + // Note: splitStoreFiles creates daughter region dirs under the parent splits dir + // Nothing to unroll here if failure -- re-run createSplitsDir will + // clean this up. + int nbFiles = 0; + for (String family: regionFs.getFamilies()) { + Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family); + if (storeFiles != null) { + nbFiles += storeFiles.size(); + } + } + if (nbFiles == 0) { + // no file needs to be splitted. + return new Pair<>(0,0); + } + // Default max #threads to use is the smaller of table's configured number of blocking store + // files or the available number of logical cores. + int defMaxThreads = Math.min( + conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT), + Runtime.getRuntime().availableProcessors()); + // Max #threads is the smaller of the number of storefiles or the default max determined above. + int maxThreads = Math.min( + conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), nbFiles); + LOG.info("Preparing to split " + nbFiles + " storefiles for region " + parentHRI + + " using " + maxThreads + " threads"); + ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool( + maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d")); + List<Future<Pair<Path,Path>>> futures = new ArrayList<>(nbFiles); + + // Split each store file. + final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); + for (String family: regionFs.getFamilies()) { + final HColumnDescriptor hcd = htd.getFamily(family.getBytes()); + final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family); + if (storeFiles != null && storeFiles.size() > 0) { + final CacheConfig cacheConf = new CacheConfig(conf, hcd); + for (StoreFileInfo storeFileInfo: storeFiles) { + StoreFileSplitter sfs = + new StoreFileSplitter(regionFs, family.getBytes(), new StoreFile(mfs.getFileSystem(), + storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), true)); + futures.add(threadPool.submit(sfs)); + } + } + } + // Shutdown the pool + threadPool.shutdown(); + + // Wait for all the tasks to finish + long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", 30000); + try { + boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS); + if (stillRunning) { + threadPool.shutdownNow(); + // wait for the thread to shutdown completely. + while (!threadPool.isTerminated()) { + Thread.sleep(50); + } + throw new IOException("Took too long to split the" + + " files and create the references, aborting split"); + } + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + + int daughterA = 0; + int daughterB = 0; + // Look for any exception + for (Future<Pair<Path, Path>> future : futures) { + try { + Pair<Path, Path> p = future.get(); + daughterA += p.getFirst() != null ? 1 : 0; + daughterB += p.getSecond() != null ? 1 : 0; + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Split storefiles for region " + parentHRI + " Daughter A: " + daughterA + + " storefiles, Daughter B: " + daughterB + " storefiles."); + } + return new Pair<>(daughterA, daughterB); + } + + private void assertReferenceFileCount( + final FileSystem fs, + final int expectedReferenceFileCount, + final Path dir) + throws IOException { + if (expectedReferenceFileCount != 0 && + expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, dir)) { + throw new IOException("Failing split. Expected reference file count isn't equal."); + } + } + + private Pair<Path, Path> splitStoreFile(final HRegionFileSystem regionFs, + final byte[] family, final StoreFile sf) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " + parentHRI); + } + + final byte[] splitRow = getSplitRow(); + final String familyName = Bytes.toString(family); + final Path path_first = + regionFs.splitStoreFile(this.daughter_1_HRI, familyName, sf, splitRow, false, null); + final Path path_second = + regionFs.splitStoreFile(this.daughter_2_HRI, familyName, sf, splitRow, true, null); + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " + parentHRI); + } + return new Pair<>(path_first, path_second); + } + + /** + * Utility class used to do the file splitting / reference writing + * in parallel instead of sequentially. + */ + private class StoreFileSplitter implements Callable<Pair<Path,Path>> { + private final HRegionFileSystem regionFs; + private final byte[] family; + private final StoreFile sf; + + /** + * Constructor that takes what it needs to split + * @param regionFs the file system + * @param family Family that contains the store file + * @param sf which file + */ + public StoreFileSplitter( + final HRegionFileSystem regionFs, + final byte[] family, + final StoreFile sf) { + this.regionFs = regionFs; + this.sf = sf; + this.family = family; + } + + public Pair<Path,Path> call() throws IOException { + return splitStoreFile(regionFs, family, sf); + } + } + + /** + * Post split region actions before the Point-of-No-Return step + * @param env MasterProcedureEnv + **/ + private void preSplitRegionBeforePONR(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final List<Mutation> metaEntries = new ArrayList<>(); + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + if (cpHost.preSplitBeforePONRAction(getSplitRow(), metaEntries, getUser())) { + throw new IOException("Coprocessor bypassing region " + + parentHRI.getRegionNameAsString() + " split."); + } + try { + for (Mutation p : metaEntries) { + HRegionInfo.parseRegionName(p.getRow()); + } + } catch (IOException e) { + LOG.error("Row key of mutation from coprocessor is not parsable as region name." + + "Mutations from coprocessor should only for hbase:meta table."); + throw e; + } + } + } + + /** + * Add daughter regions to META + * @param env MasterProcedureEnv + * @throws IOException + */ + private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException { + RegionStateTransition.Builder transition = RegionStateTransition.newBuilder(); + transition.setTransitionCode(TransitionCode.SPLIT_PONR); + transition.addRegionInfo(HRegionInfo.convert(parentHRI)); + transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI)); + transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI)); + if (env.getMasterServices().getAssignmentManager().onRegionTransition( + getParentRegionState(env).getServerName(), transition.build()) != null) { + throw new IOException("Failed to update meta to add daughter regions in split region " + + parentHRI.getRegionNameAsString()); + } + } + + /** + * Pre split region actions after the Point-of-No-Return step + * @param env MasterProcedureEnv + **/ + private void preSplitRegionAfterPONR(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.preSplitAfterPONRAction(getUser()); + } + } + + /** + * Assign daughter regions + * @param env MasterProcedureEnv + * @throws IOException + * @throws InterruptedException + **/ + private void openDaughterRegions(final MasterProcedureEnv env) + throws IOException, InterruptedException { + env.getMasterServices().getAssignmentManager().assignDaughterRegions( + parentHRI, daughter_1_HRI, daughter_2_HRI); + } + + /** + * Post split region actions + * @param env MasterProcedureEnv + **/ + private void postSplitRegion(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postCompletedSplitRegionAction(daughter_1_HRI, daughter_2_HRI, getUser()); + } + } + + /** + * Get parent region state + * @param env MasterProcedureEnv + * @return parent region state + */ + private RegionState getParentRegionState(final MasterProcedureEnv env) { + RegionStates regionStates = env.getMasterServices().getAssignmentManager().getRegionStates(); + RegionState state = regionStates.getRegionState(parentHRI); + if (state == null) { + LOG.warn("Split but not in region states: " + parentHRI); + state = regionStates.createRegionState(parentHRI); + } + return state; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled + */ + private boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java index 86143ac..f74df79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -31,8 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; public interface TableProcedureInterface { public enum TableOperationType { CREATE, DELETE, DISABLE, EDIT, ENABLE, READ, - REGION_EDIT, REGION_SPLIT, REGION_MERGE, REGION_ASSIGN, REGION_UNASSIGN, - REGION_GC, MERGED_REGIONS_GC/* region operations */ + REGION_EDIT, SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */ }; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java index e7f5ead..e41b2cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java @@ -85,7 +85,7 @@ public class TruncateTableProcedure // TODO: Move out... in the acquireLock() LOG.debug("waiting for '" + getTableName() + "' regions in transition"); - regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName()); + regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; ProcedureSyncWait.waitRegionInTransition(env, regions); @@ -121,14 +121,12 @@ public class TruncateTableProcedure setNextState(TruncateTableState.TRUNCATE_TABLE_ASSIGN_REGIONS); break; case TRUNCATE_TABLE_ASSIGN_REGIONS: - CreateTableProcedure.setEnablingState(env, getTableName()); - addChildProcedure(env.getAssignmentManager().createAssignProcedures(regions)); + CreateTableProcedure.assignRegions(env, getTableName(), regions); setNextState(TruncateTableState.TRUNCATE_TABLE_POST_OPERATION); hTableDescriptor = null; regions = null; break; case TRUNCATE_TABLE_POST_OPERATION: - CreateTableProcedure.setEnabledState(env, getTableName()); postTruncate(env); LOG.debug("truncate '" + getTableName() + "' completed"); return Flow.NO_MORE_STATE; http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java index 1ff05eb..25328b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@ -118,10 +118,8 @@ public class MobFileCache { this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), period, period, TimeUnit.SECONDS); - if (LOG.isDebugEnabled()) { - LOG.debug("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize + + LOG.info("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize + ", evictPeriods=" + period + "sec, evictRemainRatio=" + evictRemainRatio); - } } else { LOG.info("MobFileCache disabled"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java index 232309b..a30bfef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java @@ -126,14 +126,14 @@ public class NamespaceAuditor { } } - public void updateQuotaForRegionMerge(HRegionInfo mergedRegion) throws IOException { + public void updateQuotaForRegionMerge(HRegionInfo hri) throws IOException { if (!stateManager.isInitialized()) { throw new IOException( "Merge operation is being performed even before namespace auditor is initialized."); - } else if (!stateManager.checkAndUpdateNamespaceRegionCount(mergedRegion.getTable(), - mergedRegion.getRegionName(), -1)) { - throw new QuotaExceededException("Region merge not possible for :" + - mergedRegion.getEncodedName() + " as quota limits are exceeded "); + } else if (!stateManager + .checkAndUpdateNamespaceRegionCount(hri.getTable(), hri.getRegionName(), -1)) { + throw new QuotaExceededException("Region split not possible for :" + hri.getEncodedName() + + " as quota limits are exceeded "); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java index 8f6a21d..604f211 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java @@ -88,9 +88,8 @@ class NamespaceStateManager { if (nspdesc != null) { NamespaceTableAndRegionInfo currentStatus; currentStatus = getState(namespace); - int regionCount = currentStatus.getRegionCount(); - long maxRegionCount = TableNamespaceManager.getMaxRegions(nspdesc); - if (incr > 0 && regionCount >= maxRegionCount) { + if (incr > 0 && + currentStatus.getRegionCount() >= TableNamespaceManager.getMaxRegions(nspdesc)) { LOG.warn("The region " + Bytes.toStringBinary(regionName) + " cannot be created. The region count will exceed quota on the namespace. " + "This may be transient, please retry later if there are any ongoing split" http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java index 8f6a33a..9d24e6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -330,7 +330,7 @@ public class MasterQuotaManager implements RegionStateListener { namespaceQuotaManager.checkQuotaToCreateTable(tName, regions); } } - + public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException { if (initialized) { namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions); @@ -347,14 +347,12 @@ public class MasterQuotaManager implements RegionStateListener { return -1; } - @Override - public void onRegionMerged(HRegionInfo mergedRegion) throws IOException { + public void onRegionMerged(HRegionInfo hri) throws IOException { if (initialized) { - namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion); + namespaceQuotaManager.updateQuotaForRegionMerge(hri); } } - @Override public void onRegionSplit(HRegionInfo hri) throws IOException { if (initialized) { namespaceQuotaManager.checkQuotaToSplitRegion(hri);
