http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 71c6b89..2703947 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -19,40 +19,55 @@ package org.apache.hadoop.hbase.master.procedure;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.MasterWalManager;
-import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
-import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
-import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
 import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
 import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Handle crashed server. This is a port to ProcedureV2 of what used to be 
euphemistically called
  * ServerShutdownHandler.
  *
- * <p>The procedure flow varies dependent on whether meta is assigned and if 
we are to split logs.
+ * <p>The procedure flow varies dependent on whether meta is assigned, if we 
are
+ * doing distributed log replay versus distributed log splitting, and if we 
are to split logs at
+ * all.
+ *
+ * <p>This procedure asks that all crashed servers get processed equally; we 
yield after the
+ * completion of each successful flow step. We do this so that we do not 
'deadlock' waiting on
+ * a region assignment so we can replay edits which could happen if a region 
moved there are edits
+ * on two servers for replay.
  *
- * <p>We come in here after ServerManager has noticed a server has expired. 
Procedures
- * queued on the rpc should have been notified about fail and should be 
concurrently
- * getting themselves ready to assign elsewhere.
+ * <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit 
when assign is pv2.
+ * TODO: We do not have special handling for system tables.
  */
 public class ServerCrashProcedure
 extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState>
@@ -60,6 +75,36 @@ implements ServerProcedureInterface {
   private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class);
 
   /**
+   * Configuration key to set how long to wait in ms doing a quick check on 
meta state.
+   */
+  public static final String KEY_SHORT_WAIT_ON_META =
+      "hbase.master.servercrash.short.wait.on.meta.ms";
+
+  public static final int DEFAULT_SHORT_WAIT_ON_META = 1000;
+
+  /**
+   * Configuration key to set how many retries to cycle before we give up on 
meta.
+   * Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} 
milliseconds.
+   */
+  public static final String KEY_RETRIES_ON_META =
+      "hbase.master.servercrash.meta.retries";
+
+  public static final int DEFAULT_RETRIES_ON_META = 10;
+
+  /**
+   * Configuration key to set how long to wait in ms on regions in transition.
+   */
+  public static final String KEY_WAIT_ON_RIT =
+      "hbase.master.servercrash.wait.on.rit.ms";
+
+  public static final int DEFAULT_WAIT_ON_RIT = 30000;
+
+  private static final Set<HRegionInfo> META_REGION_SET = new HashSet<>();
+  static {
+    META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO);
+  }
+
+  /**
    * Name of the crashed server to process.
    */
   private ServerName serverName;
@@ -72,8 +117,14 @@ implements ServerProcedureInterface {
   /**
    * Regions that were on the crashed server.
    */
-  private List<HRegionInfo> regionsOnCrashedServer;
+  private Set<HRegionInfo> regionsOnCrashedServer;
 
+  /**
+   * Regions assigned. Usually some subset of {@link #regionsOnCrashedServer}.
+   */
+  private List<HRegionInfo> regionsAssigned;
+
+  private boolean distributedLogReplay = false;
   private boolean carryingMeta = false;
   private boolean shouldSplitWal;
 
@@ -113,11 +164,20 @@ implements ServerProcedureInterface {
     super();
   }
 
+  private void throwProcedureYieldException(final String msg) throws 
ProcedureYieldException {
+    String logMsg = msg + "; cycle=" + this.cycles + ", running for " +
+        StringUtils.formatTimeDiff(System.currentTimeMillis(), 
getSubmittedTime());
+    // The procedure executor logs ProcedureYieldException at trace level. For 
now, log these
+    // yields for server crash processing at DEBUG. Revisit when stable.
+    if (LOG.isDebugEnabled()) LOG.debug(logMsg);
+    throw new ProcedureYieldException(logMsg);
+  }
+
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState 
state)
-      throws ProcedureSuspendedException, ProcedureYieldException {
+      throws ProcedureYieldException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace(state  + " " + this + "; cycles=" + this.cycles);
+      LOG.trace(state);
     }
     // Keep running count of cycles
     if (state.ordinal() != this.previousState) {
@@ -126,7 +186,11 @@ implements ServerProcedureInterface {
     } else {
       this.cycles++;
     }
-    final MasterServices services = env.getMasterServices();
+    MasterServices services = env.getMasterServices();
+    // Is master fully online? If not, yield. No processing of servers unless 
master is up
+    if (!services.getAssignmentManager().isFailoverCleanupDone()) {
+      throwProcedureYieldException("Waiting on master failover to complete");
+    }
     // HBASE-14802
     // If we have not yet notified that we are processing a dead server, we 
should do now.
     if (!notifiedDeadServer) {
@@ -137,61 +201,102 @@ implements ServerProcedureInterface {
     try {
       switch (state) {
       case SERVER_CRASH_START:
-        LOG.info("Start " + this);
+        LOG.info("Start processing crashed " + this.serverName);
         start(env);
         // If carrying meta, process it first. Else, get list of regions on 
crashed server.
-        if (this.carryingMeta) {
-          setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
-        } else {
-          setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
-        }
+        if (this.carryingMeta) 
setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META);
+        else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
         break;
 
       case SERVER_CRASH_GET_REGIONS:
         // If hbase:meta is not assigned, yield.
-        if (env.getAssignmentManager().waitMetaInitialized(this)) {
-          throw new ProcedureSuspendedException();
+        if (!isMetaAssignedQuickTest(env)) {
+          // isMetaAssignedQuickTest does not really wait. Let's delay a 
little before
+          // another round of execution.
+          long wait =
+              env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META,
+                DEFAULT_SHORT_WAIT_ON_META);
+          wait = wait / 10;
+          Thread.sleep(wait);
+          throwProcedureYieldException("Waiting on hbase:meta assignment");
         }
-
-        this.regionsOnCrashedServer = 
services.getAssignmentManager().getRegionStates()
-          .getServerRegionInfoSet(serverName);
-        // Where to go next? Depends on whether we should split logs at all or
-        // if we should do distributed log splitting.
+        this.regionsOnCrashedServer =
+            
services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName);
+        // Where to go next? Depends on whether we should split logs at all or 
if we should do
+        // distributed log splitting (DLS) vs distributed log replay (DLR).
         if (!this.shouldSplitWal) {
           setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+        } else if (this.distributedLogReplay) {
+          setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY);
         } else {
           setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
         }
         break;
 
       case SERVER_CRASH_PROCESS_META:
-        processMeta(env);
+        // If we fail processing hbase:meta, yield.
+        if (!processMeta(env)) {
+          throwProcedureYieldException("Waiting on regions-in-transition to 
clear");
+        }
         setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
         break;
 
+      case SERVER_CRASH_PREPARE_LOG_REPLAY:
+        prepareLogReplay(env, this.regionsOnCrashedServer);
+        setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+        break;
+
       case SERVER_CRASH_SPLIT_LOGS:
         splitLogs(env);
-        setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+        // If DLR, go to FINISH. Otherwise, if DLS, go to 
SERVER_CRASH_CALC_REGIONS_TO_ASSIGN
+        if (this.distributedLogReplay) 
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+        else setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
         break;
 
       case SERVER_CRASH_ASSIGN:
+        List<HRegionInfo> regionsToAssign = calcRegionsToAssign(env);
+
+        // Assign may not be idempotent. SSH used to requeue the SSH if we got 
an IOE assigning
+        // which is what we are mimicing here but it looks prone to double 
assignment if assign
+        // fails midway. TODO: Test.
+
         // If no regions to assign, skip assign and skip to the finish.
-        // Filter out meta regions. Those are handled elsewhere in this 
procedure.
-        // Filter changes this.regionsOnCrashedServer.
-        if (filterDefaultMetaRegions(regionsOnCrashedServer)) {
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Assigning regions " +
-              HRegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + 
this +
-              "; cycles=" + this.cycles);
+        boolean regions = regionsToAssign != null && 
!regionsToAssign.isEmpty();
+        if (regions) {
+          this.regionsAssigned = regionsToAssign;
+          if (!assign(env, regionsToAssign)) {
+            throwProcedureYieldException("Failed assign; will retry");
           }
-          handleRIT(env, regionsOnCrashedServer);
-          addChildProcedure(env.getAssignmentManager().
-              createAssignProcedures(regionsOnCrashedServer, true));
         }
-        setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+        if (this.shouldSplitWal && distributedLogReplay) {
+          // Take this route even if there are apparently no regions assigned. 
This may be our
+          // second time through here; i.e. we assigned and crashed just about 
here. On second
+          // time through, there will be no regions because we assigned them 
in the previous step.
+          // Even though no regions, we need to go through here to clean up 
the DLR zk markers.
+          setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN);
+        } else {
+          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
+        }
+        break;
+
+      case SERVER_CRASH_WAIT_ON_ASSIGN:
+        // TODO: The list of regionsAssigned may be more than we actually 
assigned. See down in
+        // AM #1629 around 'if 
(regionStates.wasRegionOnDeadServer(encodedName)) {' where where we
+        // will skip assigning a region because it is/was on a dead server. 
Should never happen!
+        // It was on this server. Worst comes to worst, we'll still wait here 
till other server is
+        // processed.
+
+        // If the wait on assign failed, yield -- if we have regions to assign.
+        if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) {
+          if (!waitOnAssign(env, this.regionsAssigned)) {
+            throwProcedureYieldException("Waiting on region assign");
+          }
+        }
+        setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
         break;
 
       case SERVER_CRASH_FINISH:
+        LOG.info("Finished processing of crashed " + serverName);
         services.getServerManager().getDeadServers().finish(serverName);
         return Flow.NO_MORE_STATE;
 
@@ -199,7 +304,11 @@ implements ServerProcedureInterface {
         throw new UnsupportedOperationException("unhandled state=" + state);
       }
     } catch (IOException e) {
-      LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + 
this.cycles, e);
+      LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + 
"; retry", e);
+    } catch (InterruptedException e) {
+      // TODO: Make executor allow IEs coming up out of execute.
+      LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + 
state + "; retry", e);
+      Thread.currentThread().interrupt();
     }
     return Flow.HAS_MORE_STATE;
   }
@@ -209,60 +318,96 @@ implements ServerProcedureInterface {
    * @param env
    * @throws IOException
    */
-  private void start(final MasterProcedureEnv env) throws IOException {}
+  private void start(final MasterProcedureEnv env) throws IOException {
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
+    // Set recovery mode late. This is what the old ServerShutdownHandler used 
do.
+    mwm.setLogRecoveryMode();
+    this.distributedLogReplay = mwm.getLogRecoveryMode() == 
RecoveryMode.LOG_REPLAY;
+  }
 
   /**
    * @param env
+   * @return False if we fail to assign and split logs on meta ('process').
    * @throws IOException
    * @throws InterruptedException
    */
-  private void processMeta(final MasterProcedureEnv env) throws IOException {
+  private boolean processMeta(final MasterProcedureEnv env)
+  throws IOException {
     if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + 
this.serverName);
-
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO;
     if (this.shouldSplitWal) {
-      // TODO: Matteo. We BLOCK here but most important thing to be doing at 
this moment.
-      env.getMasterServices().getMasterWalManager().splitMetaLog(serverName);
+      if (this.distributedLogReplay) {
+        prepareLogReplay(env, META_REGION_SET);
+      } else {
+        // TODO: Matteo. We BLOCK here but most important thing to be doing at 
this moment.
+        mwm.splitMetaLog(serverName);
+        am.getRegionStates().logSplit(metaHRI);
+      }
     }
 
     // Assign meta if still carrying it. Check again: region may be assigned 
because of RIT timeout
-    final AssignmentManager am = 
env.getMasterServices().getAssignmentManager();
-    for (HRegionInfo hri: 
am.getRegionStates().getServerRegionInfoSet(serverName)) {
-      if (!isDefaultMetaRegion(hri)) continue;
-
-      am.offlineRegion(hri);
-      addChildProcedure(am.createAssignProcedure(hri, true));
+    boolean processed = true;
+    if (am.isCarryingMeta(serverName)) {
+      // TODO: May block here if hard time figuring state of meta.
+      am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
+      verifyAndAssignMetaWithRetries(env);
+      if (this.shouldSplitWal && distributedLogReplay) {
+        int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, 
DEFAULT_WAIT_ON_RIT);
+        if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) {
+          processed = false;
+        } else {
+          // TODO: Matteo. We BLOCK here but most important thing to be doing 
at this moment.
+          mwm.splitMetaLog(serverName);
+        }
+      }
     }
+    return processed;
   }
 
-  private boolean filterDefaultMetaRegions(final List<HRegionInfo> regions) {
-    if (regions == null) return false;
-    final Iterator<HRegionInfo> it = regions.iterator();
-    while (it.hasNext()) {
-      final HRegionInfo hri = it.next();
-      if (isDefaultMetaRegion(hri)) {
-        it.remove();
+  /**
+   * @return True if region cleared RIT, else false if we timed out waiting.
+   * @throws InterruptedIOException
+   */
+  private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am,
+      final HRegionInfo hri, final int timeout)
+  throws InterruptedIOException {
+    try {
+      if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) {
+        // Wait here is to avoid log replay hits current dead server and incur 
a RPC timeout
+        // when replay happens before region assignment completes.
+        LOG.warn("Region " + hri.getEncodedName() + " didn't complete 
assignment in time");
+        return false;
       }
+    } catch (InterruptedException ie) {
+      throw new InterruptedIOException("Caught " + ie +
+        " during waitOnRegionToClearRegionsInTransition for " + hri);
     }
-    return !regions.isEmpty();
+    return true;
   }
 
-  private boolean isDefaultMetaRegion(final HRegionInfo hri) {
-    return hri.getTable().equals(TableName.META_TABLE_NAME) &&
-      RegionReplicaUtil.isDefaultReplica(hri);
+  private void prepareLogReplay(final MasterProcedureEnv env, final 
Set<HRegionInfo> regions)
+  throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " 
regions-in-recovery from " +
+        this.serverName);
+    }
+    MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    mwm.prepareLogReplay(this.serverName, regions);
+    am.getRegionStates().logSplit(this.serverName);
   }
 
   private void splitLogs(final MasterProcedureEnv env) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Splitting WALs " + this);
+      LOG.debug("Splitting logs from " + serverName + "; region count=" +
+        size(this.regionsOnCrashedServer));
     }
     MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
     AssignmentManager am = env.getMasterServices().getAssignmentManager();
     // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor 
while it is running.
-    // PROBLEM!!! WE BLOCK HERE.
     mwm.splitLog(this.serverName);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Done splitting WALs " + this);
-    }
     am.getRegionStates().logSplit(this.serverName);
   }
 
@@ -270,6 +415,124 @@ implements ServerProcedureInterface {
     return hris == null? 0: hris.size();
   }
 
+  /**
+   * Figure out what we need to assign. Should be idempotent.
+   * @param env
+   * @return List of calculated regions to assign; may be empty or null.
+   * @throws IOException
+   */
+  private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env)
+  throws IOException {
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    List<HRegionInfo> regionsToAssignAggregator = new ArrayList<>();
+    int replicaCount = 
env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM,
+      HConstants.DEFAULT_META_REPLICA_NUM);
+    for (int i = 1; i < replicaCount; i++) {
+      HRegionInfo metaHri =
+          
RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i);
+      if (am.isCarryingMetaReplica(this.serverName, metaHri)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Reassigning meta replica" + metaHri + " that was on " + 
this.serverName);
+        }
+        regionsToAssignAggregator.add(metaHri);
+      }
+    }
+    // Clean out anything in regions in transition.
+    List<HRegionInfo> regionsInTransition = 
am.cleanOutCrashedServerReferences(serverName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) +
+        " region(s) that " + (serverName == null? "null": serverName)  +
+        " was carrying (and " + regionsInTransition.size() +
+        " regions(s) that were opening on this server)");
+    }
+    regionsToAssignAggregator.addAll(regionsInTransition);
+
+    // Iterate regions that were on this server and figure which of these we 
need to reassign
+    if (this.regionsOnCrashedServer != null && 
!this.regionsOnCrashedServer.isEmpty()) {
+      RegionStates regionStates = am.getRegionStates();
+      for (HRegionInfo hri: this.regionsOnCrashedServer) {
+        if (regionsInTransition.contains(hri)) continue;
+        String encodedName = hri.getEncodedName();
+        Lock lock = am.acquireRegionLock(encodedName);
+        try {
+          RegionState rit = regionStates.getRegionTransitionState(hri);
+          if (processDeadRegion(hri, am)) {
+            ServerName addressFromAM = 
regionStates.getRegionServerOfRegion(hri);
+            if (addressFromAM != null && 
!addressFromAM.equals(this.serverName)) {
+              // If this region is in transition on the dead server, it must be
+              // opening or pending_open, which should have been covered by
+              // AM#cleanOutCrashedServerReferences
+              LOG.info("Skip assigning " + hri.getRegionNameAsString()
+                + " because opened on " + addressFromAM.getServerName());
+              continue;
+            }
+            if (rit != null) {
+              if (rit.getServerName() != null && 
!rit.isOnServer(this.serverName)) {
+                // Skip regions that are in transition on other server
+                LOG.info("Skip assigning region in transition on other server" 
+ rit);
+                continue;
+              }
+              LOG.info("Reassigning region " + rit + " and clearing zknode if 
exists");
+              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
+            } else if (regionStates.isRegionInState(
+                hri, RegionState.State.SPLITTING_NEW, 
RegionState.State.MERGING_NEW)) {
+              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
+            }
+            regionsToAssignAggregator.add(hri);
+          // TODO: The below else if is different in branch-1 from master 
branch.
+          } else if (rit != null) {
+            if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline())
+                && am.getTableStateManager().isTableState(hri.getTable(),
+                TableState.State.DISABLED, TableState.State.DISABLING) ||
+                am.getReplicasToClose().contains(hri)) {
+              // If the table was partially disabled and the RS went down, we 
should clear the
+              // RIT and remove the node for the region.
+              // The rit that we use may be stale in case the table was in 
DISABLING state
+              // but though we did assign we will not be clearing the znode in 
CLOSING state.
+              // Doing this will have no harm. See HBASE-5927
+              regionStates.updateRegionState(hri, RegionState.State.OFFLINE);
+              am.offlineDisabledRegion(hri);
+            } else {
+              LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in 
transition "
+                + rit + " not to be assigned by SSH of server " + serverName);
+            }
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+    return regionsToAssignAggregator;
+  }
+
+  private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> 
hris)
+  throws InterruptedIOException {
+    AssignmentManager am = env.getMasterServices().getAssignmentManager();
+    try {
+      am.assign(hris);
+    } catch (InterruptedException ie) {
+      LOG.error("Caught " + ie + " during round-robin assignment");
+      throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
+    } catch (IOException ioe) {
+      LOG.info("Caught " + ioe + " during region assignment, will retry");
+      return false;
+    }
+    return true;
+  }
+
+  private boolean waitOnAssign(final MasterProcedureEnv env, final 
List<HRegionInfo> hris)
+  throws InterruptedIOException {
+    int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, 
DEFAULT_WAIT_ON_RIT);
+    for (HRegionInfo hri: hris) {
+      // TODO: Blocks here.
+      if 
(!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(),
+          hri, timeout)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   @Override
   protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
   throws IOException {
@@ -317,11 +580,11 @@ implements ServerProcedureInterface {
   @Override
   public void toStringClassDetails(StringBuilder sb) {
     sb.append(getClass().getSimpleName());
-    sb.append(" server=");
-    sb.append(serverName);
-    sb.append(", splitWal=");
+    sb.append(" serverName=");
+    sb.append(this.serverName);
+    sb.append(", shouldSplitWal=");
     sb.append(shouldSplitWal);
-    sb.append(", meta=");
+    sb.append(", carryingMeta=");
     sb.append(carryingMeta);
   }
 
@@ -332,6 +595,7 @@ implements ServerProcedureInterface {
     MasterProcedureProtos.ServerCrashStateData.Builder state =
       MasterProcedureProtos.ServerCrashStateData.newBuilder().
       setServerName(ProtobufUtil.toServerName(this.serverName)).
+      setDistributedLogReplay(this.distributedLogReplay).
       setCarryingMeta(this.carryingMeta).
       setShouldSplitWal(this.shouldSplitWal);
     if (this.regionsOnCrashedServer != null && 
!this.regionsOnCrashedServer.isEmpty()) {
@@ -339,6 +603,11 @@ implements ServerProcedureInterface {
         state.addRegionsOnCrashedServer(HRegionInfo.convert(hri));
       }
     }
+    if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) {
+      for (HRegionInfo hri: this.regionsAssigned) {
+        state.addRegionsAssigned(HRegionInfo.convert(hri));
+      }
+    }
     state.build().writeDelimitedTo(stream);
   }
 
@@ -349,16 +618,142 @@ implements ServerProcedureInterface {
     MasterProcedureProtos.ServerCrashStateData state =
       MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream);
     this.serverName = ProtobufUtil.toServerName(state.getServerName());
+    this.distributedLogReplay = state.hasDistributedLogReplay()?
+      state.getDistributedLogReplay(): false;
     this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): 
false;
     // shouldSplitWAL has a default over in pb so this invocation will always 
work.
     this.shouldSplitWal = state.getShouldSplitWal();
     int size = state.getRegionsOnCrashedServerCount();
     if (size > 0) {
-      this.regionsOnCrashedServer = new ArrayList<HRegionInfo>(size);
+      this.regionsOnCrashedServer = new HashSet<>(size);
       for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
         this.regionsOnCrashedServer.add(HRegionInfo.convert(ri));
       }
     }
+    size = state.getRegionsAssignedCount();
+    if (size > 0) {
+      this.regionsAssigned = new ArrayList<>(size);
+      for (RegionInfo ri: state.getRegionsOnCrashedServerList()) {
+        this.regionsAssigned.add(HRegionInfo.convert(ri));
+      }
+    }
+  }
+
+  /**
+   * Process a dead region from a dead RS. Checks if the region is disabled or
+   * disabling or if the region has a partially completed split.
+   * @param hri
+   * @param assignmentManager
+   * @return Returns true if specified region should be assigned, false if not.
+   * @throws IOException
+   */
+  private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager 
assignmentManager)
+  throws IOException {
+    boolean tablePresent = 
assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
+    if (!tablePresent) {
+      LOG.info("The table " + hri.getTable() + " was deleted.  Hence not 
proceeding.");
+      return false;
+    }
+    // If table is not disabled but the region is offlined,
+    boolean disabled = 
assignmentManager.getTableStateManager().isTableState(hri.getTable(),
+      TableState.State.DISABLED);
+    if (disabled){
+      LOG.info("The table " + hri.getTable() + " was disabled.  Hence not 
proceeding.");
+      return false;
+    }
+    if (hri.isOffline() && hri.isSplit()) {
+      // HBASE-7721: Split parent and daughters are inserted into hbase:meta 
as an atomic operation.
+      // If the meta scanner saw the parent split, then it should see the 
daughters as assigned
+      // to the dead server. We don't have to do anything.
+      return false;
+    }
+    boolean disabling = 
assignmentManager.getTableStateManager().isTableState(hri.getTable(),
+        TableState.State.DISABLING);
+    if (disabling) {
+      LOG.info("The table " + hri.getTable() + " is disabled.  Hence not 
assigning region" +
+        hri.getEncodedName());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * If hbase:meta is not assigned already, assign.
+   * @throws IOException
+   */
+  private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) 
throws IOException {
+    MasterServices services = env.getMasterServices();
+    int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, 
DEFAULT_RETRIES_ON_META);
+    // Just reuse same time as we have for short wait on meta. Adding another 
config is overkill.
+    long waitTime =
+      services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, 
DEFAULT_SHORT_WAIT_ON_META);
+    int iFlag = 0;
+    while (true) {
+      try {
+        verifyAndAssignMeta(env);
+        break;
+      } catch (KeeperException e) {
+        services.abort("In server shutdown processing, assigning meta", e);
+        throw new IOException("Aborting", e);
+      } catch (Exception e) {
+        if (iFlag >= iTimes) {
+          services.abort("verifyAndAssignMeta failed after" + iTimes + " 
retries, aborting", e);
+          throw new IOException("Aborting", e);
+        }
+        try {
+          Thread.sleep(waitTime);
+        } catch (InterruptedException e1) {
+          LOG.warn("Interrupted when is the thread sleep", e1);
+          Thread.currentThread().interrupt();
+          throw (InterruptedIOException)new 
InterruptedIOException().initCause(e1);
+        }
+        iFlag++;
+      }
+    }
+  }
+
+  /**
+   * If hbase:meta is not assigned already, assign.
+   * @throws InterruptedException
+   * @throws IOException
+   * @throws KeeperException
+   */
+  private void verifyAndAssignMeta(final MasterProcedureEnv env)
+      throws InterruptedException, IOException, KeeperException {
+    MasterServices services = env.getMasterServices();
+    if (!isMetaAssignedQuickTest(env)) {
+      
services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO);
+    } else if (serverName.equals(services.getMetaTableLocator().
+        getMetaRegionLocation(services.getZooKeeper()))) {
+      throw new IOException("hbase:meta is onlined on the dead server " + 
this.serverName);
+    } else {
+      LOG.info("Skip assigning hbase:meta because it is online at "
+          + 
services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper()));
+    }
+  }
+
+  /**
+   * A quick test that hbase:meta is assigned; blocks for short time only.
+   * @return True if hbase:meta location is available and verified as good.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env)
+  throws InterruptedException, IOException {
+    ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper();
+    MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator();
+    boolean metaAssigned = false;
+    // Is hbase:meta location available yet?
+    if (mtl.isLocationAvailable(zkw)) {
+      ClusterConnection connection = 
env.getMasterServices().getClusterConnection();
+      // Is hbase:meta location good yet?
+      long timeout =
+        env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, 
DEFAULT_SHORT_WAIT_ON_META);
+      if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) {
+        metaAssigned = true;
+      }
+    }
+    return metaAssigned;
   }
 
   @Override
@@ -394,46 +789,4 @@ implements ServerProcedureInterface {
     // the client does not know about this procedure.
     return false;
   }
-
-  /**
-   * Handle any outstanding RIT that are up against this.serverName, the 
crashed server.
-   * Notify them of crash. Remove assign entries from the passed in 
<code>regions</code>
-   * otherwise we have two assigns going on and they will fight over who has 
lock.
-   * Notify Unassigns also.
-   * @param crashedServer Server that crashed.
-   * @param regions Regions that were on crashed server
-   * @return Subset of <code>regions</code> that were RIT against 
<code>crashedServer</code>
-   */
-  private void handleRIT(final MasterProcedureEnv env, final List<HRegionInfo> 
regions) {
-    if (regions == null) return;
-    AssignmentManager am = env.getMasterServices().getAssignmentManager();
-    final Iterator<HRegionInfo> it = regions.iterator();
-    ServerCrashException sce = null;
-    while (it.hasNext()) {
-      final HRegionInfo hri = it.next();
-      RegionTransitionProcedure rtp = 
am.getRegionStates().getRegionTransitionProcedure(hri);
-      if (rtp == null) continue;
-      // Make sure the RIT is against this crashed server. In the case where 
there are many
-      // processings of a crashed server -- backed up for whatever reason 
(slow WAL split) --
-      // then a previous SCP may have already failed an assign, etc., and it 
may have a new
-      // location target; DO NOT fail these else we make for assign flux.
-      ServerName rtpServerName = rtp.getServer(env);
-      if (rtpServerName == null) {
-        LOG.warn("RIT with ServerName null! " + rtp);
-        continue;
-      }
-      if (!rtpServerName.equals(this.serverName)) continue;
-      LOG.info("pid=" + getProcId() + " found RIT " + rtp + "; " +
-        rtp.getRegionState(env).toShortString());
-      // Notify RIT on server crash.
-      if (sce == null) {
-        sce = new ServerCrashException(getProcId(), getServerName());
-      }
-      rtp.remoteCallFailed(env, this.serverName, sce);
-      if (rtp instanceof AssignProcedure) {
-        // If an assign, include it in our return and remove from passed-in 
list of regions.
-        it.remove();
-      }
-    }
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
new file mode 100644
index 0000000..bf9afd7
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitTableRegionProcedure.java
@@ -0,0 +1,785 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SplitTableRegionState;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The procedure to split a region in a table.
+ */
[email protected]
+public class SplitTableRegionProcedure
+    extends AbstractStateMachineTableProcedure<SplitTableRegionState> {
+  private static final Log LOG = 
LogFactory.getLog(SplitTableRegionProcedure.class);
+
+  private Boolean traceEnabled;
+
+  /*
+   * Region to split
+   */
+  private HRegionInfo parentHRI;
+  private HRegionInfo daughter_1_HRI;
+  private HRegionInfo daughter_2_HRI;
+
+  public SplitTableRegionProcedure() {
+    this.traceEnabled = null;
+  }
+
+  public SplitTableRegionProcedure(final MasterProcedureEnv env,
+      final HRegionInfo regionToSplit, final byte[] splitRow) throws 
IOException {
+    super(env);
+
+    checkSplitRow(regionToSplit, splitRow);
+
+    this.traceEnabled = null;
+    this.parentHRI = regionToSplit;
+
+    final TableName table = regionToSplit.getTable();
+    final long rid = getDaughterRegionIdTimestamp(regionToSplit);
+    this.daughter_1_HRI = new HRegionInfo(table, regionToSplit.getStartKey(), 
splitRow, false, rid);
+    this.daughter_2_HRI = new HRegionInfo(table, splitRow, 
regionToSplit.getEndKey(), false, rid);
+  }
+
+  private static void checkSplitRow(final HRegionInfo regionToSplit, final 
byte[] splitRow)
+      throws IOException {
+    if (splitRow == null || splitRow.length == 0) {
+      throw new DoNotRetryIOException("Split row cannot be null");
+    }
+
+    if (Bytes.equals(regionToSplit.getStartKey(), splitRow)) {
+      throw new DoNotRetryIOException(
+        "Split row is equal to startkey: " + Bytes.toStringBinary(splitRow));
+    }
+
+    if (!regionToSplit.containsRow(splitRow)) {
+      throw new DoNotRetryIOException(
+        "Split row is not inside region key range splitKey:" + 
Bytes.toStringBinary(splitRow) +
+        " region: " + regionToSplit);
+    }
+  }
+
+  /**
+   * Calculate daughter regionid to use.
+   * @param hri Parent {@link HRegionInfo}
+   * @return Daughter region id (timestamp) to use.
+   */
+  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
+    long rid = EnvironmentEdgeManager.currentTime();
+    // Regionid is timestamp.  Can't be less than that of parent else will 
insert
+    // at wrong location in hbase:meta (See HBASE-710).
+    if (rid < hri.getRegionId()) {
+      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
+        " but current time here is " + rid);
+      rid = hri.getRegionId() + 1;
+    }
+    return rid;
+  }
+
+  @Override
+  protected Flow executeFromState(final MasterProcedureEnv env, final 
SplitTableRegionState state)
+      throws InterruptedException {
+    if (isTraceEnabled()) {
+      LOG.trace(this + " execute state=" + state);
+    }
+
+    try {
+      switch (state) {
+      case SPLIT_TABLE_REGION_PREPARE:
+        if (prepareSplitRegion(env)) {
+          setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION);
+          break;
+        } else {
+          assert isFailed() : "split region should have an exception here";
+          return Flow.NO_MORE_STATE;
+        }
+      case SPLIT_TABLE_REGION_PRE_OPERATION:
+        preSplitRegion(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE);
+        break;
+      case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE:
+        setRegionStateToSplitting(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CLOSE_PARENT_REGION);
+        break;
+      case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
+        closeParentRegionForSplit(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS);
+        break;
+      case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
+        createDaughterRegions(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR);
+        break;
+      case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
+        preSplitRegionBeforePONR(env);
+        setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
+        break;
+      case SPLIT_TABLE_REGION_UPDATE_META:
+        // This is the point of no return.  Adding subsequent edits to .META. 
as we
+        // do below when we do the daughter opens adding each to .META. can 
fail in
+        // various interesting ways the most interesting of which is a timeout
+        // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
+        // then subsequent failures need to crash out this region server; the
+        // server shutdown processing should be able to fix-up the incomplete 
split.
+        // The offlined parent will have the daughters as extra columns.  If
+        // we leave the daughter regions in place and do not remove them when 
we
+        // crash out, then they will have their references to the parent in 
place
+        // still and the server shutdown fixup of .META. will point to these
+        // regions.
+        // We should add PONR JournalEntry before offlineParentInMeta,so even 
if
+        // OfflineParentInMeta timeout,this will cause regionserver exit,and 
then
+        // master ServerShutdownHandler will fix daughter & avoid data loss. 
(See
+        // HBase-4562).
+        updateMetaForDaughterRegions(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR);
+        break;
+      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+        preSplitRegionAfterPONR(env);
+        
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS);
+        break;
+      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+        openDaughterRegions(env);
+        setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_POST_OPERATION);
+        break;
+      case SPLIT_TABLE_REGION_POST_OPERATION:
+        postSplitRegion(env);
+        return Flow.NO_MORE_STATE;
+      default:
+        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
+      }
+    } catch (IOException e) {
+      String msg = "Error trying to split region " + 
parentHRI.getEncodedName() + " in the table "
+          + getTableName() + " (in state=" + state + ")";
+      if (!isRollbackSupported(state)) {
+        // We reach a state that cannot be rolled back. We just need to keep 
retry.
+        LOG.warn(msg, e);
+      } else {
+        LOG.error(msg, e);
+        setFailure("master-split-region", e);
+      }
+    }
+    return Flow.HAS_MORE_STATE;
+  }
+
+  @Override
+  protected void rollbackState(final MasterProcedureEnv env, final 
SplitTableRegionState state)
+      throws IOException, InterruptedException {
+    if (isTraceEnabled()) {
+      LOG.trace(this + " rollback state=" + state);
+    }
+
+    try {
+      switch (state) {
+      case SPLIT_TABLE_REGION_POST_OPERATION:
+      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+      case SPLIT_TABLE_REGION_UPDATE_META:
+        // PONR
+        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
+      case SPLIT_TABLE_REGION_PRE_OPERATION_BEFORE_PONR:
+        break;
+      case SPLIT_TABLE_REGION_CREATE_DAUGHTER_REGIONS:
+        // Doing nothing, as re-open parent region would clean up daughter 
region directories.
+        break;
+      case SPLIT_TABLE_REGION_CLOSE_PARENT_REGION:
+        openParentRegion(env);
+        break;
+      case SPLIT_TABLE_REGION_SET_SPLITTING_TABLE_STATE:
+        setRegionStateToRevertSplitting(env);
+        break;
+      case SPLIT_TABLE_REGION_PRE_OPERATION:
+        postRollBackSplitRegion(env);
+        break;
+      case SPLIT_TABLE_REGION_PREPARE:
+        break; // nothing to do
+      default:
+        throw new UnsupportedOperationException(this + " unhandled state=" + 
state);
+      }
+    } catch (IOException e) {
+      // This will be retried. Unless there is a bug in the code,
+      // this should be just a "temporary error" (e.g. network down)
+      LOG.warn("Failed rollback attempt step " + state + " for splitting the 
region "
+        + parentHRI.getEncodedName() + " in table " + getTableName(), e);
+      throw e;
+    }
+  }
+
+  /*
+   * Check whether we are in the state that can be rollback
+   */
+  @Override
+  protected boolean isRollbackSupported(final SplitTableRegionState state) {
+    switch (state) {
+      case SPLIT_TABLE_REGION_POST_OPERATION:
+      case SPLIT_TABLE_REGION_OPEN_CHILD_REGIONS:
+      case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_PONR:
+      case SPLIT_TABLE_REGION_UPDATE_META:
+        // It is not safe to rollback if we reach to these states.
+        return false;
+      default:
+        break;
+    }
+    return true;
+  }
+
+  @Override
+  protected SplitTableRegionState getState(final int stateId) {
+    return SplitTableRegionState.forNumber(stateId);
+  }
+
+  @Override
+  protected int getStateId(final SplitTableRegionState state) {
+    return state.getNumber();
+  }
+
+  @Override
+  protected SplitTableRegionState getInitialState() {
+    return SplitTableRegionState.SPLIT_TABLE_REGION_PREPARE;
+  }
+
+  @Override
+  public void serializeStateData(final OutputStream stream) throws IOException 
{
+    super.serializeStateData(stream);
+
+    final MasterProcedureProtos.SplitTableRegionStateData.Builder 
splitTableRegionMsg =
+        MasterProcedureProtos.SplitTableRegionStateData.newBuilder()
+        .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
+        .setParentRegionInfo(HRegionInfo.convert(parentHRI))
+        .addChildRegionInfo(HRegionInfo.convert(daughter_1_HRI))
+        .addChildRegionInfo(HRegionInfo.convert(daughter_2_HRI));
+    splitTableRegionMsg.build().writeDelimitedTo(stream);
+  }
+
+  @Override
+  public void deserializeStateData(final InputStream stream) throws 
IOException {
+    super.deserializeStateData(stream);
+
+    final MasterProcedureProtos.SplitTableRegionStateData splitTableRegionsMsg 
=
+        
MasterProcedureProtos.SplitTableRegionStateData.parseDelimitedFrom(stream);
+    
setUser(MasterProcedureUtil.toUserInfo(splitTableRegionsMsg.getUserInfo()));
+    parentHRI = 
HRegionInfo.convert(splitTableRegionsMsg.getParentRegionInfo());
+    if (splitTableRegionsMsg.getChildRegionInfoCount() == 0) {
+      daughter_1_HRI = daughter_2_HRI = null;
+    } else {
+      assert(splitTableRegionsMsg.getChildRegionInfoCount() == 2);
+      daughter_1_HRI = 
HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfoList().get(0));
+      daughter_2_HRI = 
HRegionInfo.convert(splitTableRegionsMsg.getChildRegionInfoList().get(1));
+    }
+  }
+
+  @Override
+  public void toStringClassDetails(StringBuilder sb) {
+    sb.append(getClass().getSimpleName());
+    sb.append(" (table=");
+    sb.append(getTableName());
+    sb.append(" parent region=");
+    sb.append(parentHRI);
+    if (daughter_1_HRI != null) {
+      sb.append(" first daughter region=");
+      sb.append(daughter_1_HRI);
+    }
+    if (daughter_2_HRI != null) {
+      sb.append(" and second daughter region=");
+      sb.append(daughter_2_HRI);
+    }
+    sb.append(")");
+  }
+
+  @Override
+  protected LockState acquireLock(final MasterProcedureEnv env) {
+    if (env.waitInitialized(this)) {
+      return LockState.LOCK_EVENT_WAIT;
+    }
+    return env.getProcedureScheduler().waitRegions(this, getTableName(), 
parentHRI)?
+        LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
+  }
+
+  @Override
+  protected void releaseLock(final MasterProcedureEnv env) {
+    env.getProcedureScheduler().wakeRegions(this, getTableName(), parentHRI);
+  }
+
+  @Override
+  public TableName getTableName() {
+    return parentHRI.getTable();
+  }
+
+  @Override
+  public TableOperationType getTableOperationType() {
+    return TableOperationType.SPLIT;
+  }
+
+  private byte[] getSplitRow() {
+    return daughter_2_HRI.getStartKey();
+  }
+
+  /**
+   * Prepare to Split region.
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public boolean prepareSplitRegion(final MasterProcedureEnv env) throws 
IOException {
+    // Check whether the region is splittable
+    final RegionState state = getParentRegionState(env);
+    if (state.isClosing() || state.isClosed() ||
+        state.isSplittingOrSplitOnServer(state.getServerName())) {
+      setFailure(
+        "master-split-region",
+        new IOException("Split region " + parentHRI + " failed due to region 
is not splittable"));
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Action before splitting region in a table.
+   * @param env MasterProcedureEnv
+   * @param state the procedure state
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void preSplitRegion(final MasterProcedureEnv env)
+      throws IOException, InterruptedException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preSplitRegionAction(getTableName(), getSplitRow(), getUser());
+    }
+  }
+
+  /**
+   * Action after rollback a split table region action.
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void postRollBackSplitRegion(final MasterProcedureEnv env) throws 
IOException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.postRollBackSplitRegionAction(getUser());
+    }
+  }
+
+  /**
+   * Set the parent region state to SPLITTING state
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public void setRegionStateToSplitting(final MasterProcedureEnv env) throws 
IOException {
+    RegionStateTransition.Builder transition = 
RegionStateTransition.newBuilder();
+    transition.setTransitionCode(TransitionCode.READY_TO_SPLIT);
+    transition.addRegionInfo(HRegionInfo.convert(parentHRI));
+    transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI));
+    transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI));
+    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
+      getParentRegionState(env).getServerName(), transition.build()) != null) {
+      throw new IOException("Failed to update region state to SPLITTING for "
+          + parentHRI.getRegionNameAsString());
+    }
+  }
+
+  /**
+   * Rollback the region state change
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void setRegionStateToRevertSplitting(final MasterProcedureEnv env) 
throws IOException {
+    RegionStateTransition.Builder transition = 
RegionStateTransition.newBuilder();
+    transition.setTransitionCode(TransitionCode.SPLIT_REVERTED);
+    transition.addRegionInfo(HRegionInfo.convert(parentHRI));
+    transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI));
+    transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI));
+    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
+      getParentRegionState(env).getServerName(), transition.build()) != null) {
+      throw new IOException("Failed to update region state for "
+          + parentHRI.getRegionNameAsString() + " as part of operation for 
reverting split");
+    }
+  }
+
+  /**
+   * RPC to region server that host the parent region, ask for close the 
parent regions
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public void closeParentRegionForSplit(final MasterProcedureEnv env) throws 
IOException {
+    boolean success = 
env.getMasterServices().getServerManager().sendRegionCloseForSplitOrMerge(
+      getParentRegionState(env).getServerName(), parentHRI);
+    if (!success) {
+      throw new IOException("Close parent region " + parentHRI + " for 
splitting failed."
+        + "  Check region server log for more details");
+    }
+  }
+
+  /**
+   * Rollback close parent region
+   * @param env MasterProcedureEnv
+   **/
+  private void openParentRegion(final MasterProcedureEnv env) throws 
IOException {
+    // Check whether the region is closed; if so, open it in the same server
+    RegionState state = getParentRegionState(env);
+    if (state.isClosing() || state.isClosed()) {
+      env.getMasterServices().getServerManager().sendRegionOpen(
+        getParentRegionState(env).getServerName(),
+        parentHRI,
+        ServerName.EMPTY_SERVER_LIST);
+    }
+  }
+
+  /**
+   * Create daughter regions
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public void createDaughterRegions(final MasterProcedureEnv env) throws 
IOException {
+    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), 
parentHRI.getTable());
+    final FileSystem fs = mfs.getFileSystem();
+    HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
+      env.getMasterConfiguration(), fs, tabledir, parentHRI, false);
+    regionFs.createSplitsDir();
+
+    Pair<Integer, Integer> expectedReferences = splitStoreFiles(env, regionFs);
+
+    assertReferenceFileCount(
+      fs, expectedReferences.getFirst(), 
regionFs.getSplitsDir(daughter_1_HRI));
+    //Move the files from the temporary .splits to the final /table/region 
directory
+    regionFs.commitDaughterRegion(daughter_1_HRI);
+    assertReferenceFileCount(
+      fs,
+      expectedReferences.getFirst(),
+      new Path(tabledir, daughter_1_HRI.getEncodedName()));
+
+    assertReferenceFileCount(
+      fs, expectedReferences.getSecond(), 
regionFs.getSplitsDir(daughter_2_HRI));
+    regionFs.commitDaughterRegion(daughter_2_HRI);
+    assertReferenceFileCount(
+      fs,
+      expectedReferences.getSecond(),
+      new Path(tabledir, daughter_2_HRI.getEncodedName()));
+  }
+
+  /**
+   * Create Split directory
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
+      final HRegionFileSystem regionFs) throws IOException {
+    final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
+    final Configuration conf = env.getMasterConfiguration();
+
+    // The following code sets up a thread pool executor with as many slots as
+    // there's files to split. It then fires up everything, waits for
+    // completion and finally checks for any exception
+    //
+    // Note: splitStoreFiles creates daughter region dirs under the parent 
splits dir
+    // Nothing to unroll here if failure -- re-run createSplitsDir will
+    // clean this up.
+    int nbFiles = 0;
+    for (String family: regionFs.getFamilies()) {
+      Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
+      if (storeFiles != null) {
+        nbFiles += storeFiles.size();
+      }
+    }
+    if (nbFiles == 0) {
+      // no file needs to be splitted.
+      return new Pair<>(0,0);
+    }
+    // Default max #threads to use is the smaller of table's configured number 
of blocking store
+    // files or the available number of logical cores.
+    int defMaxThreads = Math.min(
+      conf.getInt(HStore.BLOCKING_STOREFILES_KEY, 
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
+      Runtime.getRuntime().availableProcessors());
+    // Max #threads is the smaller of the number of storefiles or the default 
max determined above.
+    int maxThreads = Math.min(
+      conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), 
nbFiles);
+    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + 
parentHRI +
+            " using " + maxThreads + " threads");
+    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(
+      maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
+    List<Future<Pair<Path,Path>>> futures = new ArrayList<>(nbFiles);
+
+    // Split each store file.
+    final HTableDescriptor htd = 
env.getMasterServices().getTableDescriptors().get(getTableName());
+    for (String family: regionFs.getFamilies()) {
+      final HColumnDescriptor hcd = htd.getFamily(family.getBytes());
+      final Collection<StoreFileInfo> storeFiles = 
regionFs.getStoreFiles(family);
+      if (storeFiles != null && storeFiles.size() > 0) {
+        final CacheConfig cacheConf = new CacheConfig(conf, hcd);
+        for (StoreFileInfo storeFileInfo: storeFiles) {
+          StoreFileSplitter sfs =
+              new StoreFileSplitter(regionFs, family.getBytes(), new 
StoreFile(mfs.getFileSystem(),
+                  storeFileInfo, conf, cacheConf, hcd.getBloomFilterType(), 
true));
+          futures.add(threadPool.submit(sfs));
+        }
+      }
+    }
+    // Shutdown the pool
+    threadPool.shutdown();
+
+    // Wait for all the tasks to finish
+    long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout", 
30000);
+    try {
+      boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, 
TimeUnit.MILLISECONDS);
+      if (stillRunning) {
+        threadPool.shutdownNow();
+        // wait for the thread to shutdown completely.
+        while (!threadPool.isTerminated()) {
+          Thread.sleep(50);
+        }
+        throw new IOException("Took too long to split the" +
+            " files and create the references, aborting split");
+      }
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+    }
+
+    int daughterA = 0;
+    int daughterB = 0;
+    // Look for any exception
+    for (Future<Pair<Path, Path>> future : futures) {
+      try {
+        Pair<Path, Path> p = future.get();
+        daughterA += p.getFirst() != null ? 1 : 0;
+        daughterB += p.getSecond() != null ? 1 : 0;
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new 
InterruptedIOException().initCause(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Split storefiles for region " + parentHRI + " Daughter A: " + 
daughterA
+          + " storefiles, Daughter B: " + daughterB + " storefiles.");
+    }
+    return new Pair<>(daughterA, daughterB);
+  }
+
+  private void assertReferenceFileCount(
+      final FileSystem fs,
+      final int expectedReferenceFileCount,
+      final Path dir)
+      throws IOException {
+    if (expectedReferenceFileCount != 0 &&
+        expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(fs, 
dir)) {
+      throw new IOException("Failing split. Expected reference file count 
isn't equal.");
+    }
+  }
+
+  private Pair<Path, Path> splitStoreFile(final HRegionFileSystem regionFs,
+      final byte[] family, final StoreFile sf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Splitting started for store file: " + sf.getPath() + " for 
region: " + parentHRI);
+    }
+
+    final byte[] splitRow = getSplitRow();
+    final String familyName = Bytes.toString(family);
+    final Path path_first =
+        regionFs.splitStoreFile(this.daughter_1_HRI, familyName, sf, splitRow, 
false, null);
+    final Path path_second =
+        regionFs.splitStoreFile(this.daughter_2_HRI, familyName, sf, splitRow, 
true, null);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Splitting complete for store file: " + sf.getPath() + " for 
region: " + parentHRI);
+    }
+    return new Pair<>(path_first, path_second);
+  }
+
+  /**
+   * Utility class used to do the file splitting / reference writing
+   * in parallel instead of sequentially.
+   */
+  private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
+    private final HRegionFileSystem regionFs;
+    private final byte[] family;
+    private final StoreFile sf;
+
+    /**
+     * Constructor that takes what it needs to split
+     * @param regionFs the file system
+     * @param family Family that contains the store file
+     * @param sf which file
+     */
+    public StoreFileSplitter(
+        final HRegionFileSystem regionFs,
+        final byte[] family,
+        final StoreFile sf) {
+      this.regionFs = regionFs;
+      this.sf = sf;
+      this.family = family;
+    }
+
+    public Pair<Path,Path> call() throws IOException {
+      return splitStoreFile(regionFs, family, sf);
+    }
+  }
+
+  /**
+   * Post split region actions before the Point-of-No-Return step
+   * @param env MasterProcedureEnv
+   **/
+  private void preSplitRegionBeforePONR(final MasterProcedureEnv env)
+    throws IOException, InterruptedException {
+    final List<Mutation> metaEntries = new ArrayList<>();
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      if (cpHost.preSplitBeforePONRAction(getSplitRow(), metaEntries, 
getUser())) {
+        throw new IOException("Coprocessor bypassing region " +
+            parentHRI.getRegionNameAsString() + " split.");
+      }
+      try {
+        for (Mutation p : metaEntries) {
+          HRegionInfo.parseRegionName(p.getRow());
+        }
+      } catch (IOException e) {
+        LOG.error("Row key of mutation from coprocessor is not parsable as 
region name."
+            + "Mutations from coprocessor should only for hbase:meta table.");
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Add daughter regions to META
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   */
+  private void updateMetaForDaughterRegions(final MasterProcedureEnv env) 
throws IOException {
+    RegionStateTransition.Builder transition = 
RegionStateTransition.newBuilder();
+    transition.setTransitionCode(TransitionCode.SPLIT_PONR);
+    transition.addRegionInfo(HRegionInfo.convert(parentHRI));
+    transition.addRegionInfo(HRegionInfo.convert(daughter_1_HRI));
+    transition.addRegionInfo(HRegionInfo.convert(daughter_2_HRI));
+    if (env.getMasterServices().getAssignmentManager().onRegionTransition(
+      getParentRegionState(env).getServerName(), transition.build()) != null) {
+      throw new IOException("Failed to update meta to add daughter regions in 
split region "
+          + parentHRI.getRegionNameAsString());
+    }
+  }
+
+  /**
+   * Pre split region actions after the Point-of-No-Return step
+   * @param env MasterProcedureEnv
+   **/
+  private void preSplitRegionAfterPONR(final MasterProcedureEnv env)
+      throws IOException, InterruptedException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.preSplitAfterPONRAction(getUser());
+    }
+  }
+
+  /**
+   * Assign daughter regions
+   * @param env MasterProcedureEnv
+   * @throws IOException
+   * @throws InterruptedException
+   **/
+  private void openDaughterRegions(final MasterProcedureEnv env)
+      throws IOException, InterruptedException {
+    env.getMasterServices().getAssignmentManager().assignDaughterRegions(
+      parentHRI, daughter_1_HRI, daughter_2_HRI);
+  }
+
+  /**
+   * Post split region actions
+   * @param env MasterProcedureEnv
+   **/
+  private void postSplitRegion(final MasterProcedureEnv env) throws 
IOException {
+    final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+    if (cpHost != null) {
+      cpHost.postCompletedSplitRegionAction(daughter_1_HRI, daughter_2_HRI, 
getUser());
+    }
+  }
+
+  /**
+   * Get parent region state
+   * @param env MasterProcedureEnv
+   * @return parent region state
+   */
+  private RegionState getParentRegionState(final MasterProcedureEnv env) {
+    RegionStates regionStates = 
env.getMasterServices().getAssignmentManager().getRegionStates();
+    RegionState state = regionStates.getRegionState(parentHRI);
+    if (state == null) {
+      LOG.warn("Split but not in region states: " + parentHRI);
+      state = regionStates.createRegionState(parentHRI);
+    }
+    return state;
+  }
+
+  /**
+   * The procedure could be restarted from a different machine. If the 
variable is null, we need to
+   * retrieve it.
+   * @return traceEnabled
+   */
+  private boolean isTraceEnabled() {
+    if (traceEnabled == null) {
+      traceEnabled = LOG.isTraceEnabled();
+    }
+    return traceEnabled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
index 86143ac..f74df79 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
@@ -31,8 +31,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 public interface TableProcedureInterface {
   public enum TableOperationType {
     CREATE, DELETE, DISABLE, EDIT, ENABLE, READ,
-    REGION_EDIT, REGION_SPLIT, REGION_MERGE, REGION_ASSIGN, REGION_UNASSIGN,
-      REGION_GC, MERGED_REGIONS_GC/* region operations */
+    REGION_EDIT, SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
index e7f5ead..e41b2cd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
@@ -85,7 +85,7 @@ public class TruncateTableProcedure
 
           // TODO: Move out... in the acquireLock()
           LOG.debug("waiting for '" + getTableName() + "' regions in 
transition");
-          regions = 
env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName());
+          regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
           assert regions != null && !regions.isEmpty() : "unexpected 0 
regions";
           ProcedureSyncWait.waitRegionInTransition(env, regions);
 
@@ -121,14 +121,12 @@ public class TruncateTableProcedure
           setNextState(TruncateTableState.TRUNCATE_TABLE_ASSIGN_REGIONS);
           break;
         case TRUNCATE_TABLE_ASSIGN_REGIONS:
-          CreateTableProcedure.setEnablingState(env, getTableName());
-          
addChildProcedure(env.getAssignmentManager().createAssignProcedures(regions));
+          CreateTableProcedure.assignRegions(env, getTableName(), regions);
           setNextState(TruncateTableState.TRUNCATE_TABLE_POST_OPERATION);
           hTableDescriptor = null;
           regions = null;
           break;
         case TRUNCATE_TABLE_POST_OPERATION:
-          CreateTableProcedure.setEnabledState(env, getTableName());
           postTruncate(env);
           LOG.debug("truncate '" + getTableName() + "' completed");
           return Flow.NO_MORE_STATE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
index 1ff05eb..25328b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java
@@ -118,10 +118,8 @@ public class MobFileCache {
       this.scheduleThreadPool.scheduleAtFixedRate(new EvictionThread(this), 
period, period,
           TimeUnit.SECONDS);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize 
+
+      LOG.info("MobFileCache enabled with cacheSize=" + mobFileMaxCacheSize +
           ", evictPeriods=" +  period + "sec, evictRemainRatio=" + 
evictRemainRatio);
-      }
     } else {
       LOG.info("MobFileCache disabled");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
index 232309b..a30bfef 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceAuditor.java
@@ -126,14 +126,14 @@ public class NamespaceAuditor {
     }
   }
 
-  public void updateQuotaForRegionMerge(HRegionInfo mergedRegion) throws 
IOException {
+  public void updateQuotaForRegionMerge(HRegionInfo hri) throws IOException {
     if (!stateManager.isInitialized()) {
       throw new IOException(
           "Merge operation is being performed even before namespace auditor is 
initialized.");
-    } else if 
(!stateManager.checkAndUpdateNamespaceRegionCount(mergedRegion.getTable(),
-        mergedRegion.getRegionName(), -1)) {
-      throw new QuotaExceededException("Region merge not possible for :" +
-        mergedRegion.getEncodedName() + " as quota limits are exceeded ");
+    } else if (!stateManager
+        .checkAndUpdateNamespaceRegionCount(hri.getTable(), 
hri.getRegionName(), -1)) {
+      throw new QuotaExceededException("Region split not possible for :" + 
hri.getEncodedName()
+          + " as quota limits are exceeded ");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
index 8f6a21d..604f211 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/namespace/NamespaceStateManager.java
@@ -88,9 +88,8 @@ class NamespaceStateManager {
     if (nspdesc != null) {
       NamespaceTableAndRegionInfo currentStatus;
       currentStatus = getState(namespace);
-      int regionCount = currentStatus.getRegionCount();
-      long maxRegionCount = TableNamespaceManager.getMaxRegions(nspdesc);
-      if (incr > 0 && regionCount >= maxRegionCount) {
+      if (incr > 0 &&
+          currentStatus.getRegionCount() >= 
TableNamespaceManager.getMaxRegions(nspdesc)) {
         LOG.warn("The region " + Bytes.toStringBinary(regionName)
             + " cannot be created. The region count  will exceed quota on the 
namespace. "
             + "This may be transient, please retry later if there are any 
ongoing split"

http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 8f6a33a..9d24e6c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -330,7 +330,7 @@ public class MasterQuotaManager implements 
RegionStateListener {
       namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
     }
   }
-
+  
   public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) 
throws IOException {
     if (initialized) {
       namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
@@ -347,14 +347,12 @@ public class MasterQuotaManager implements 
RegionStateListener {
     return -1;
   }
 
-  @Override
-  public void onRegionMerged(HRegionInfo mergedRegion) throws IOException {
+  public void onRegionMerged(HRegionInfo hri) throws IOException {
     if (initialized) {
-      namespaceQuotaManager.updateQuotaForRegionMerge(mergedRegion);
+      namespaceQuotaManager.updateQuotaForRegionMerge(hri);
     }
   }
 
-  @Override
   public void onRegionSplit(HRegionInfo hri) throws IOException {
     if (initialized) {
       namespaceQuotaManager.checkQuotaToSplitRegion(hri);

Reply via email to